Merge "taskprocessor: Enable subsystems and overload by subsystem" into 16
This commit is contained in:
commit
8b3579a7e5
9
CHANGES
9
CHANGES
|
@ -20,6 +20,15 @@ ARI
|
|||
types defined in the "disallowed" list are not sent to the application. Note
|
||||
that if a type is specified in both lists "disallowed" takes precedence.
|
||||
|
||||
res_pjsip
|
||||
------------------
|
||||
* A new configuration parameter "taskprocessor_overload_trigger" has been
|
||||
added to the pjsip.conf "globals" section. The distributor currently stops
|
||||
accepting new requests when any taskprocessor overload is triggered. The
|
||||
new option allows you to completely disable overload detection (NOT
|
||||
RECOMMENDED), keep the current behavior, or trigger only on pjsip
|
||||
taskprocessor overloads.
|
||||
|
||||
------------------------------------------------------------------------------
|
||||
--- Functionality changes from Asterisk 16.1.0 to Asterisk 16.2.0 ------------
|
||||
------------------------------------------------------------------------------
|
||||
|
|
|
@ -1137,6 +1137,17 @@
|
|||
; event when a device refreshes its registration
|
||||
; (default: "no")
|
||||
|
||||
;taskprocessor_overload_trigger=global
|
||||
; Set the trigger the distributor will use to detect
|
||||
; taskprocessor overloads. When triggered, the distributor
|
||||
; will not accept any new requests until the overload has
|
||||
; cleared.
|
||||
: "global": (default) Any taskprocessor overload will trigger.
|
||||
; "pjsip_only": Only pjsip taskprocessor overloads will trigger.
|
||||
; "none": No overload detection will be performed.
|
||||
; WARNING: The "none" and "pjsip_only" options should be used
|
||||
; with extreme caution and only to mitigate specific issues.
|
||||
; Under certain conditions they could make things worse.
|
||||
|
||||
; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
|
||||
;==========================ACL SECTION OPTIONS=========================
|
||||
|
|
|
@ -0,0 +1,42 @@
|
|||
"""taskprocessor_overload_trigger
|
||||
|
||||
Revision ID: f3c0b8695b66
|
||||
Revises: 0838f8db6a61
|
||||
Create Date: 2019-02-15 15:03:50.106790
|
||||
|
||||
"""
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision = 'f3c0b8695b66'
|
||||
down_revision = '0838f8db6a61'
|
||||
|
||||
from alembic import op
|
||||
import sqlalchemy as sa
|
||||
from sqlalchemy.dialects.postgresql import ENUM
|
||||
|
||||
PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME = 'pjsip_taskprocessor_overload_trigger_values'
|
||||
PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES = ['none', 'global', 'pjsip_only']
|
||||
|
||||
def upgrade():
|
||||
context = op.get_context()
|
||||
|
||||
if context.bind.dialect.name == 'postgresql':
|
||||
enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
|
||||
name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
|
||||
enum.create(op.get_bind(), checkfirst=False)
|
||||
|
||||
op.add_column('ps_globals',
|
||||
sa.Column('taskprocessor_overload_trigger',
|
||||
sa.Enum(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
|
||||
name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME,
|
||||
create_type=False)))
|
||||
|
||||
def downgrade():
|
||||
if op.get_context().bind.dialect.name == 'mssql':
|
||||
op.drop_constraint('ck_ps_globals_taskprocessor_overload_trigger_pjsip_taskprocessor_overload_trigger_values', 'ps_globals')
|
||||
op.drop_column('ps_globals', 'taskprocessor_overload_trigger')
|
||||
|
||||
if context.bind.dialect.name == 'postgresql':
|
||||
enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
|
||||
name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
|
||||
enum.drop(op.get_bind(), checkfirst=False)
|
|
@ -341,6 +341,19 @@ long ast_taskprocessor_size(struct ast_taskprocessor *tps);
|
|||
*/
|
||||
unsigned int ast_taskprocessor_alert_get(void);
|
||||
|
||||
|
||||
/*!
|
||||
* \brief Get the current taskprocessor high water alert count by sybsystem.
|
||||
* \since 13.26.0
|
||||
* \since 16.3.0
|
||||
*
|
||||
* \param subsystem The subsystem name
|
||||
*
|
||||
* \retval 0 if no taskprocessors are in high water alert.
|
||||
* \retval non-zero if some task processors are in high water alert.
|
||||
*/
|
||||
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem);
|
||||
|
||||
/*!
|
||||
* \brief Set the high and low alert water marks of the given taskprocessor queue.
|
||||
* \since 13.10.0
|
||||
|
|
|
@ -380,7 +380,7 @@ int ast_sorcery_init(void)
|
|||
};
|
||||
ast_assert(wizards == NULL);
|
||||
|
||||
threadpool = ast_threadpool_create("Sorcery", NULL, &options);
|
||||
threadpool = ast_threadpool_create("sorcery", NULL, &options);
|
||||
if (!threadpool) {
|
||||
return -1;
|
||||
}
|
||||
|
|
|
@ -690,7 +690,7 @@ struct stasis_subscription *internal_stasis_subscribe(
|
|||
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
|
||||
|
||||
/* Create name with seq number appended. */
|
||||
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
|
||||
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
|
||||
use_thread_pool ? 'p' : 'm',
|
||||
stasis_topic_name(topic));
|
||||
|
||||
|
@ -2616,7 +2616,7 @@ int stasis_init(void)
|
|||
threadpool_opts.auto_increment = 1;
|
||||
threadpool_opts.max_size = cfg->threadpool_options->max_size;
|
||||
threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
|
||||
pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
|
||||
pool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
|
||||
ao2_ref(cfg, -1);
|
||||
if (!pool) {
|
||||
ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");
|
||||
|
|
|
@ -89,7 +89,11 @@ struct ast_taskprocessor {
|
|||
unsigned int high_water_alert:1;
|
||||
/*! Indicates if the taskprocessor is currently suspended */
|
||||
unsigned int suspended:1;
|
||||
/*! \brief Friendly name of the taskprocessor */
|
||||
/*! \brief Anything before the first '/' in the name (if there is one) */
|
||||
char *subsystem;
|
||||
/*! \brief Friendly name of the taskprocessor.
|
||||
* Subsystem is appended after the name's NULL terminator.
|
||||
*/
|
||||
char name[0];
|
||||
};
|
||||
|
||||
|
@ -112,6 +116,16 @@ struct ast_taskprocessor_listener {
|
|||
void *user_data;
|
||||
};
|
||||
|
||||
/*!
|
||||
* Keep track of which subsystems are in alert
|
||||
* and how many of their taskprocessors are overloaded.
|
||||
*/
|
||||
struct subsystem_alert {
|
||||
unsigned int alert_count;
|
||||
char subsystem[0];
|
||||
};
|
||||
static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
|
||||
|
||||
#ifdef LOW_MEMORY
|
||||
#define TPS_MAX_BUCKETS 61
|
||||
#else
|
||||
|
@ -138,10 +152,12 @@ static int tps_ping_handler(void *datap);
|
|||
|
||||
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
|
||||
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
|
||||
static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
|
||||
|
||||
static struct ast_cli_entry taskprocessor_clis[] = {
|
||||
AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
|
||||
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
|
||||
AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
|
||||
};
|
||||
|
||||
struct default_taskprocessor_listener_pvt {
|
||||
|
@ -271,6 +287,8 @@ static const struct ast_taskprocessor_listener_callbacks default_listener_callba
|
|||
static void tps_shutdown(void)
|
||||
{
|
||||
ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
|
||||
AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
|
||||
AST_VECTOR_RW_FREE(&overloaded_subsystems);
|
||||
ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
|
||||
tps_singletons = NULL;
|
||||
}
|
||||
|
@ -285,6 +303,12 @@ int ast_tps_init(void)
|
|||
return -1;
|
||||
}
|
||||
|
||||
if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
|
||||
ao2_ref(tps_singletons, -1);
|
||||
ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
|
||||
return -1;
|
||||
}
|
||||
|
||||
ast_cond_init(&cli_ping_cond, NULL);
|
||||
|
||||
ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
|
||||
|
@ -548,6 +572,157 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
|
|||
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
|
||||
}
|
||||
|
||||
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
|
||||
{
|
||||
return !strcmp(alert->subsystem, subsystem);
|
||||
}
|
||||
|
||||
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
|
||||
{
|
||||
return strcmp(a->subsystem, b->subsystem);
|
||||
}
|
||||
|
||||
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
|
||||
{
|
||||
struct subsystem_alert *alert;
|
||||
unsigned int count = 0;
|
||||
int idx;
|
||||
|
||||
AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
|
||||
idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
|
||||
if (idx >= 0) {
|
||||
alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
|
||||
count = alert->alert_count;
|
||||
}
|
||||
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
static void subsystem_alert_increment(const char *subsystem)
|
||||
{
|
||||
struct subsystem_alert *alert;
|
||||
int idx;
|
||||
|
||||
if (ast_strlen_zero(subsystem)) {
|
||||
return;
|
||||
}
|
||||
|
||||
AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
|
||||
idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
|
||||
if (idx >= 0) {
|
||||
alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
|
||||
alert->alert_count++;
|
||||
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
|
||||
return;
|
||||
}
|
||||
|
||||
alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
|
||||
if (!alert) {
|
||||
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
|
||||
return;
|
||||
}
|
||||
alert->alert_count = 1;
|
||||
strcpy(alert->subsystem, subsystem); /* Safe */
|
||||
|
||||
if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
|
||||
ast_free(alert);
|
||||
}
|
||||
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
|
||||
}
|
||||
|
||||
static void subsystem_alert_decrement(const char *subsystem)
|
||||
{
|
||||
struct subsystem_alert *alert;
|
||||
int idx;
|
||||
|
||||
if (ast_strlen_zero(subsystem)) {
|
||||
return;
|
||||
}
|
||||
|
||||
AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
|
||||
idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
|
||||
if (idx < 0) {
|
||||
ast_log(LOG_ERROR,
|
||||
"Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
|
||||
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
|
||||
return;
|
||||
}
|
||||
alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
|
||||
|
||||
alert->alert_count--;
|
||||
if (alert->alert_count <= 0) {
|
||||
AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
|
||||
ast_free(alert);
|
||||
}
|
||||
|
||||
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
|
||||
}
|
||||
|
||||
static void subsystem_copy(struct subsystem_alert *alert,
|
||||
struct subsystem_alert_vector *vector)
|
||||
{
|
||||
struct subsystem_alert *alert_copy;
|
||||
alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
|
||||
if (!alert_copy) {
|
||||
return;
|
||||
}
|
||||
alert_copy->alert_count = alert->alert_count;
|
||||
strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
|
||||
if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
|
||||
ast_free(alert_copy);
|
||||
}
|
||||
}
|
||||
|
||||
static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
|
||||
{
|
||||
struct subsystem_alert_vector sorted_subsystems;
|
||||
int i;
|
||||
|
||||
#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
|
||||
#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
|
||||
|
||||
switch (cmd) {
|
||||
case CLI_INIT:
|
||||
e->command = "core show taskprocessor alerted subsystems";
|
||||
e->usage =
|
||||
"Usage: core show taskprocessor alerted subsystems\n"
|
||||
" Shows a list of task processor subsystems that are currently alerted\n";
|
||||
return NULL;
|
||||
case CLI_GENERATE:
|
||||
return NULL;
|
||||
}
|
||||
|
||||
if (a->argc != e->args) {
|
||||
return CLI_SHOWUSAGE;
|
||||
}
|
||||
|
||||
if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
|
||||
return CLI_FAILURE;
|
||||
}
|
||||
|
||||
AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
|
||||
for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
|
||||
subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
|
||||
}
|
||||
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
|
||||
|
||||
ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
|
||||
|
||||
for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
|
||||
struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
|
||||
ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
|
||||
}
|
||||
|
||||
ast_cli(a->fd, "\n%lu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
|
||||
|
||||
AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
|
||||
AST_VECTOR_FREE(&sorted_subsystems);
|
||||
|
||||
return CLI_SUCCESS;
|
||||
}
|
||||
|
||||
|
||||
/*! Count of the number of taskprocessors in high water alert. */
|
||||
static unsigned int tps_alert_count;
|
||||
|
||||
|
@ -577,6 +752,15 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
|
|||
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
|
||||
tps->name, tps_alert_count ? "triggered" : "cleared");
|
||||
}
|
||||
|
||||
if (tps->subsystem[0] != '\0') {
|
||||
if (delta > 0) {
|
||||
subsystem_alert_increment(tps->subsystem);
|
||||
} else {
|
||||
subsystem_alert_decrement(tps->subsystem);
|
||||
}
|
||||
}
|
||||
|
||||
ast_rwlock_unlock(&tps_alert_lock);
|
||||
}
|
||||
|
||||
|
@ -747,8 +931,17 @@ static void *default_listener_pvt_alloc(void)
|
|||
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
|
||||
{
|
||||
struct ast_taskprocessor *p;
|
||||
char *subsystem_separator;
|
||||
size_t subsystem_length = 0;
|
||||
size_t name_length;
|
||||
|
||||
p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
|
||||
name_length = strlen(name);
|
||||
subsystem_separator = strchr(name, '/');
|
||||
if (subsystem_separator) {
|
||||
subsystem_length = subsystem_separator - name;
|
||||
}
|
||||
|
||||
p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
|
||||
if (!p) {
|
||||
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
|
||||
return NULL;
|
||||
|
@ -758,7 +951,9 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
|
|||
p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
|
||||
p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
|
||||
|
||||
strcpy(p->name, name); /*SAFE*/
|
||||
strcpy(p->name, name); /* Safe */
|
||||
p->subsystem = p->name + name_length + 1;
|
||||
ast_copy_string(p->subsystem, name, subsystem_length + 1);
|
||||
|
||||
ao2_ref(listener, +1);
|
||||
p->listener = listener;
|
||||
|
|
|
@ -413,7 +413,7 @@ static struct ast_threadpool *threadpool_alloc(const char *name, const struct as
|
|||
return NULL;
|
||||
}
|
||||
|
||||
ast_str_set(&control_tps_name, 0, "%s-control", name);
|
||||
ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
|
||||
|
||||
pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
|
||||
ast_free(control_tps_name);
|
||||
|
@ -919,6 +919,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
|
|||
struct ast_taskprocessor *tps;
|
||||
RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
|
||||
RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
|
||||
char *fullname;
|
||||
|
||||
pool = threadpool_alloc(name, options);
|
||||
if (!pool) {
|
||||
|
@ -935,7 +936,9 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
|
|||
return NULL;
|
||||
}
|
||||
|
||||
tps = ast_taskprocessor_create_with_listener(name, tps_listener);
|
||||
fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
|
||||
sprintf(fullname, "%s/pool", name); /* Safe */
|
||||
tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
|
||||
if (!tps) {
|
||||
return NULL;
|
||||
}
|
||||
|
|
|
@ -1897,6 +1897,26 @@
|
|||
<configOption name="send_contact_status_on_update_registration" default="no">
|
||||
<synopsis>Enable sending AMI ContactStatus event when a device refreshes its registration.</synopsis>
|
||||
</configOption>
|
||||
<configOption name="taskprocessor_overload_trigger">
|
||||
<synopsis>Trigger scope for taskprocessor overloads</synopsis>
|
||||
<description><para>
|
||||
This option specifies the trigger the distributor will use for
|
||||
detecting taskprocessor overloads. When it detects an overload condition,
|
||||
the distrubutor will stop accepting new requests until the overload is
|
||||
cleared.
|
||||
</para>
|
||||
<enumlist>
|
||||
<enum name="global"><para>(default) Any taskprocessor overload will trigger.</para></enum>
|
||||
<enum name="pjsip_only"><para>Only pjsip taskprocessor overloads will trigger.</para></enum>
|
||||
<enum name="none"><para>No overload detection will be performed.</para></enum>
|
||||
</enumlist>
|
||||
<warning><para>
|
||||
The "none" and "pjsip_only" options should be used
|
||||
with extreme caution and only to mitigate specific issues.
|
||||
Under certain conditions they could make things worse.
|
||||
</para></warning>
|
||||
</description>
|
||||
</configOption>
|
||||
</configObject>
|
||||
</configFile>
|
||||
</configInfo>
|
||||
|
@ -5236,7 +5256,7 @@ static int load_module(void)
|
|||
/* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
|
||||
sip_get_threadpool_options(&options);
|
||||
options.thread_start = sip_thread_start;
|
||||
sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
|
||||
sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
|
||||
if (!sip_threadpool) {
|
||||
goto error;
|
||||
}
|
||||
|
|
|
@ -51,6 +51,7 @@
|
|||
#define DEFAULT_IGNORE_URI_USER_OPTIONS 0
|
||||
#define DEFAULT_USE_CALLERID_CONTACT 0
|
||||
#define DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION 0
|
||||
#define DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL
|
||||
|
||||
/*!
|
||||
* \brief Cached global config object
|
||||
|
@ -110,6 +111,8 @@ struct global_config {
|
|||
unsigned int use_callerid_contact;
|
||||
/*! Nonzero if need to send AMI ContactStatus event when a contact is updated */
|
||||
unsigned int send_contact_status_on_update_registration;
|
||||
/*! Trigger the distributor should use to pause accepting new dialogs */
|
||||
enum ast_sip_taskprocessor_overload_trigger overload_trigger;
|
||||
};
|
||||
|
||||
static void global_destructor(void *obj)
|
||||
|
@ -483,6 +486,58 @@ unsigned int ast_sip_get_send_contact_status_on_update_registration(void)
|
|||
return send_contact_status_on_update_registration;
|
||||
}
|
||||
|
||||
enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void)
|
||||
{
|
||||
enum ast_sip_taskprocessor_overload_trigger trigger;
|
||||
struct global_config *cfg;
|
||||
|
||||
cfg = get_global_cfg();
|
||||
if (!cfg) {
|
||||
return DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER;
|
||||
}
|
||||
|
||||
trigger = cfg->overload_trigger;
|
||||
ao2_ref(cfg, -1);
|
||||
return trigger;
|
||||
}
|
||||
|
||||
static int overload_trigger_handler(const struct aco_option *opt,
|
||||
struct ast_variable *var, void *obj)
|
||||
{
|
||||
struct global_config *cfg = obj;
|
||||
if (!strcasecmp(var->value, "none")) {
|
||||
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_NONE;
|
||||
} else if (!strcasecmp(var->value, "global")) {
|
||||
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL;
|
||||
} else if (!strcasecmp(var->value, "pjsip_only")) {
|
||||
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY;
|
||||
} else {
|
||||
ast_log(LOG_WARNING, "Unknown overload trigger '%s' specified for %s\n",
|
||||
var->value, var->name);
|
||||
return -1;
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
static const char *overload_trigger_map[] = {
|
||||
[TASKPROCESSOR_OVERLOAD_TRIGGER_NONE] = "none",
|
||||
[TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL] = "global",
|
||||
[TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY] = "pjsip_only"
|
||||
};
|
||||
|
||||
const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger)
|
||||
{
|
||||
return ARRAY_IN_BOUNDS(trigger, overload_trigger_map) ?
|
||||
overload_trigger_map[trigger] : "";
|
||||
}
|
||||
|
||||
static int overload_trigger_to_str(const void *obj, const intptr_t *args, char **buf)
|
||||
{
|
||||
const struct global_config *cfg = obj;
|
||||
*buf = ast_strdup(ast_sip_overload_trigger_to_str(cfg->overload_trigger));
|
||||
return 0;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \internal
|
||||
* \brief Observer to set default global object if none exist.
|
||||
|
@ -646,6 +701,9 @@ int ast_sip_initialize_sorcery_global(void)
|
|||
ast_sorcery_object_field_register(sorcery, "global", "send_contact_status_on_update_registration",
|
||||
DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION ? "yes" : "no",
|
||||
OPT_YESNO_T, 1, FLDSET(struct global_config, send_contact_status_on_update_registration));
|
||||
ast_sorcery_object_field_register_custom(sorcery, "global", "taskprocessor_overload_trigger",
|
||||
overload_trigger_map[DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER],
|
||||
overload_trigger_handler, overload_trigger_to_str, NULL, 0, 0);
|
||||
|
||||
if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
|
||||
return -1;
|
||||
|
|
|
@ -408,4 +408,14 @@ void ast_sip_destroy_transport_management(void);
|
|||
*/
|
||||
int ast_sip_persistent_endpoint_add_to_regcontext(const char *regcontext);
|
||||
|
||||
enum ast_sip_taskprocessor_overload_trigger {
|
||||
TASKPROCESSOR_OVERLOAD_TRIGGER_NONE = 0,
|
||||
TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL,
|
||||
TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY
|
||||
};
|
||||
|
||||
enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void);
|
||||
|
||||
const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger);
|
||||
|
||||
#endif /* RES_PJSIP_PRIVATE_H_ */
|
||||
|
|
|
@ -51,6 +51,7 @@ static unsigned int unidentified_count;
|
|||
static unsigned int unidentified_period;
|
||||
static unsigned int unidentified_prune_interval;
|
||||
static int using_auth_username;
|
||||
static enum ast_sip_taskprocessor_overload_trigger overload_trigger;
|
||||
|
||||
struct unidentified_request{
|
||||
struct timeval first_seen;
|
||||
|
@ -534,7 +535,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
|
|||
ao2_cleanup(dist);
|
||||
return PJ_TRUE;
|
||||
} else {
|
||||
if (ast_taskprocessor_alert_get()) {
|
||||
if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL &&
|
||||
ast_taskprocessor_alert_get())
|
||||
|| (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY &&
|
||||
ast_taskprocessor_get_subsystem_alert("pjsip"))) {
|
||||
/*
|
||||
* When taskprocessors get backed up, there is a good chance that
|
||||
* we are being overloaded and need to defer adding new work to
|
||||
|
@ -1196,6 +1200,8 @@ static void global_loaded(const char *object_type)
|
|||
|
||||
ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
|
||||
|
||||
overload_trigger = ast_sip_get_taskprocessor_overload_trigger();
|
||||
|
||||
/* Clean out the old task, if any */
|
||||
ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
|
||||
/* Have to do something with the return value to shut up the stupid compiler. */
|
||||
|
|
|
@ -46,6 +46,8 @@ struct task_data {
|
|||
ast_mutex_t lock;
|
||||
/*! Boolean indicating that the task was run */
|
||||
int task_complete;
|
||||
/*! Milliseconds to wait before returning */
|
||||
unsigned long wait_time;
|
||||
};
|
||||
|
||||
static void task_data_dtor(void *obj)
|
||||
|
@ -69,6 +71,7 @@ static struct task_data *task_data_create(void)
|
|||
ast_cond_init(&task_data->cond, NULL);
|
||||
ast_mutex_init(&task_data->lock);
|
||||
task_data->task_complete = 0;
|
||||
task_data->wait_time = 0;
|
||||
|
||||
return task_data;
|
||||
}
|
||||
|
@ -83,7 +86,11 @@ static struct task_data *task_data_create(void)
|
|||
static int task(void *data)
|
||||
{
|
||||
struct task_data *task_data = data;
|
||||
|
||||
SCOPED_MUTEX(lock, &task_data->lock);
|
||||
if (task_data->wait_time > 0) {
|
||||
usleep(task_data->wait_time * 1000);
|
||||
}
|
||||
task_data->task_complete = 1;
|
||||
ast_cond_signal(&task_data->cond);
|
||||
return 0;
|
||||
|
@ -165,6 +172,143 @@ AST_TEST_DEFINE(default_taskprocessor)
|
|||
return AST_TEST_PASS;
|
||||
}
|
||||
|
||||
/*!
|
||||
* \brief Baseline test for subsystem alert
|
||||
*/
|
||||
AST_TEST_DEFINE(subsystem_alert)
|
||||
{
|
||||
RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
|
||||
#define TEST_DATA_ARRAY_SIZE 10
|
||||
#define LOW_WATER_MARK 3
|
||||
#define HIGH_WATER_MARK 6
|
||||
struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
|
||||
int res;
|
||||
int i;
|
||||
long queue_count;
|
||||
unsigned int alert_level;
|
||||
unsigned int subsystem_alert_level;
|
||||
|
||||
switch (cmd) {
|
||||
case TEST_INIT:
|
||||
info->name = "subsystem_alert";
|
||||
info->category = "/main/taskprocessor/";
|
||||
info->summary = "Test of subsystem alerts";
|
||||
info->description =
|
||||
"Ensures alerts are generated properly.";
|
||||
return AST_TEST_NOT_RUN;
|
||||
case TEST_EXECUTE:
|
||||
break;
|
||||
}
|
||||
|
||||
tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
|
||||
|
||||
if (!tps) {
|
||||
ast_test_status_update(test, "Unable to create test taskprocessor\n");
|
||||
return AST_TEST_FAIL;
|
||||
}
|
||||
|
||||
ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK);
|
||||
ast_taskprocessor_suspend(tps);
|
||||
|
||||
for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
|
||||
task_data[i] = task_data_create();
|
||||
if (!task_data[i]) {
|
||||
ast_test_status_update(test, "Unable to create task_data\n");
|
||||
res = -1;
|
||||
goto data_cleanup;
|
||||
}
|
||||
task_data[i]->wait_time = 500;
|
||||
|
||||
ast_test_status_update(test, "Pushing task %d\n", i);
|
||||
if (ast_taskprocessor_push(tps, task, task_data[i])) {
|
||||
ast_test_status_update(test, "Failed to queue task\n");
|
||||
res = -1;
|
||||
goto data_cleanup;
|
||||
}
|
||||
|
||||
queue_count = ast_taskprocessor_size(tps);
|
||||
alert_level = ast_taskprocessor_alert_get();
|
||||
subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
|
||||
|
||||
if (queue_count == HIGH_WATER_MARK) {
|
||||
if (subsystem_alert_level) {
|
||||
ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
|
||||
}
|
||||
if (alert_level) {
|
||||
ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
|
||||
}
|
||||
} else if (queue_count < HIGH_WATER_MARK) {
|
||||
if (subsystem_alert_level > 0) {
|
||||
ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
if (alert_level > 0) {
|
||||
ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
} else {
|
||||
if (subsystem_alert_level == 0) {
|
||||
ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
if (alert_level == 0) {
|
||||
ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
ast_taskprocessor_unsuspend(tps);
|
||||
|
||||
for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
|
||||
ast_test_status_update(test, "Waiting on task %d\n", i);
|
||||
if (task_wait(task_data[i])) {
|
||||
ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
|
||||
res = -1;
|
||||
goto data_cleanup;
|
||||
}
|
||||
|
||||
queue_count = ast_taskprocessor_size(tps);
|
||||
alert_level = ast_taskprocessor_alert_get();
|
||||
subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
|
||||
|
||||
if (queue_count == LOW_WATER_MARK) {
|
||||
if (!subsystem_alert_level) {
|
||||
ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
|
||||
}
|
||||
if (!alert_level) {
|
||||
ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
|
||||
}
|
||||
} else if (queue_count > LOW_WATER_MARK) {
|
||||
if (subsystem_alert_level == 0) {
|
||||
ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
if (alert_level == 0) {
|
||||
ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
} else {
|
||||
if (subsystem_alert_level > 0) {
|
||||
ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
if (alert_level > 0) {
|
||||
ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
|
||||
res = -1;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
data_cleanup:
|
||||
for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
|
||||
ao2_cleanup(task_data[i]);
|
||||
}
|
||||
|
||||
return res ? AST_TEST_FAIL : AST_TEST_PASS;
|
||||
}
|
||||
|
||||
#define NUM_TASKS 20000
|
||||
|
||||
/*!
|
||||
|
@ -749,6 +893,7 @@ static int unload_module(void)
|
|||
{
|
||||
ast_test_unregister(default_taskprocessor);
|
||||
ast_test_unregister(default_taskprocessor_load);
|
||||
ast_test_unregister(subsystem_alert);
|
||||
ast_test_unregister(taskprocessor_listener);
|
||||
ast_test_unregister(taskprocessor_shutdown);
|
||||
ast_test_unregister(taskprocessor_push_local);
|
||||
|
@ -759,6 +904,7 @@ static int load_module(void)
|
|||
{
|
||||
ast_test_register(default_taskprocessor);
|
||||
ast_test_register(default_taskprocessor_load);
|
||||
ast_test_register(subsystem_alert);
|
||||
ast_test_register(taskprocessor_listener);
|
||||
ast_test_register(taskprocessor_shutdown);
|
||||
ast_test_register(taskprocessor_push_local);
|
||||
|
|
Loading…
Reference in New Issue