taskprocessor: Enable subsystems and overload by subsystem

To prevent one subsystem's taskprocessors from causing others
to stall, new capabilities have been added to taskprocessors.

* Any taskprocessor name that has a '/' will have the part
  before the '/' saved as its "subsystem".
  Examples:
  "sorcery/acl-0000006a" and "sorcery/aor-00000019"
  will be grouped to subsystem "sorcery".
  "pjsip/distributor-00000025" and "pjsip/distributor-00000026"
  will bn grouped to subsystem "pjsip".
  Taskprocessors with no '/' have an empty subsystem.

* When a taskprocessor enters high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will
  be incremented.

* When a taskprocessor leaves high-water alert status and it
  has a non-empty subsystem, the subsystem alert count will be
  decremented.

* A new api ast_taskprocessor_get_subsystem_alert() has been
  added that returns the number of taskprocessors in alert for
  the subsystem.

* A new CLI command "core show taskprocessor alerted subsystems"
  has been added.

* A new unit test was addded.

REMINDER: The taskprocessor code itself doesn't take any action
based on high-water alerts or overloading.  It's up to taskprocessor
users to check and take action themselves.  Currently only the pjsip
distributor does this.

* A new pjsip/global option "taskprocessor_overload_trigger"
  has been added that allows the user to select the trigger
  mechanism the distributor uses to pause accepting new requests.
  "none": Don't pause on any overload condition.
  "global": Pause on ANY taskprocessor overload (the default and
  current behavior)
  "pjsip_only": Pause only on pjsip taskprocessor overloads.

* The core pjsip pool was renamed from "SIP" to "pjsip" so it can
  be properly grouped into the "pjsip" subsystem.

* stasis taskprocessor names were changed to "stasis" as the
  subsystem.

* Sorcery core taskprocessor names were changed to "sorcery" to
  match the object taskprocessors.

Change-Id: I8c19068bb2fc26610a9f0b8624bdf577a04fcd56
This commit is contained in:
George Joseph 2019-02-15 11:53:50 -07:00
parent 1c5def4b18
commit 2f8def1453
13 changed files with 523 additions and 10 deletions

View File

@ -20,6 +20,15 @@ ARI
types defined in the "disallowed" list are not sent to the application. Note
that if a type is specified in both lists "disallowed" takes precedence.
res_pjsip
------------------
* A new configuration parameter "taskprocessor_overload_trigger" has been
added to the pjsip.conf "globals" section. The distributor currently stops
accepting new requests when any taskprocessor overload is triggered. The
new option allows you to completely disable overload detection (NOT
RECOMMENDED), keep the current behavior, or trigger only on pjsip
taskprocessor overloads.
------------------------------------------------------------------------------
--- Functionality changes from Asterisk 16.1.0 to Asterisk 16.2.0 ------------
------------------------------------------------------------------------------

View File

@ -1137,6 +1137,17 @@
; event when a device refreshes its registration
; (default: "no")
;taskprocessor_overload_trigger=global
; Set the trigger the distributor will use to detect
; taskprocessor overloads. When triggered, the distributor
; will not accept any new requests until the overload has
; cleared.
: "global": (default) Any taskprocessor overload will trigger.
; "pjsip_only": Only pjsip taskprocessor overloads will trigger.
; "none": No overload detection will be performed.
; WARNING: The "none" and "pjsip_only" options should be used
; with extreme caution and only to mitigate specific issues.
; Under certain conditions they could make things worse.
; MODULE PROVIDING BELOW SECTION(S): res_pjsip_acl
;==========================ACL SECTION OPTIONS=========================

View File

@ -0,0 +1,42 @@
"""taskprocessor_overload_trigger
Revision ID: f3c0b8695b66
Revises: 0838f8db6a61
Create Date: 2019-02-15 15:03:50.106790
"""
# revision identifiers, used by Alembic.
revision = 'f3c0b8695b66'
down_revision = '0838f8db6a61'
from alembic import op
import sqlalchemy as sa
from sqlalchemy.dialects.postgresql import ENUM
PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME = 'pjsip_taskprocessor_overload_trigger_values'
PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES = ['none', 'global', 'pjsip_only']
def upgrade():
context = op.get_context()
if context.bind.dialect.name == 'postgresql':
enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
enum.create(op.get_bind(), checkfirst=False)
op.add_column('ps_globals',
sa.Column('taskprocessor_overload_trigger',
sa.Enum(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME,
create_type=False)))
def downgrade():
if op.get_context().bind.dialect.name == 'mssql':
op.drop_constraint('ck_ps_globals_taskprocessor_overload_trigger_pjsip_taskprocessor_overload_trigger_values', 'ps_globals')
op.drop_column('ps_globals', 'taskprocessor_overload_trigger')
if context.bind.dialect.name == 'postgresql':
enum = ENUM(*PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_VALUES,
name=PJSIP_TASKPROCESSOR_OVERLOAD_TRIGGER_NAME)
enum.drop(op.get_bind(), checkfirst=False)

View File

@ -341,6 +341,19 @@ long ast_taskprocessor_size(struct ast_taskprocessor *tps);
*/
unsigned int ast_taskprocessor_alert_get(void);
/*!
* \brief Get the current taskprocessor high water alert count by sybsystem.
* \since 13.26.0
* \since 16.3.0
*
* \param subsystem The subsystem name
*
* \retval 0 if no taskprocessors are in high water alert.
* \retval non-zero if some task processors are in high water alert.
*/
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem);
/*!
* \brief Set the high and low alert water marks of the given taskprocessor queue.
* \since 13.10.0

View File

@ -380,7 +380,7 @@ int ast_sorcery_init(void)
};
ast_assert(wizards == NULL);
threadpool = ast_threadpool_create("Sorcery", NULL, &options);
threadpool = ast_threadpool_create("sorcery", NULL, &options);
if (!threadpool) {
return -1;
}

View File

@ -677,7 +677,7 @@ struct stasis_subscription *internal_stasis_subscribe(
char tps_name[AST_TASKPROCESSOR_MAX_NAME + 1];
/* Create name with seq number appended. */
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "sub%c:%s",
ast_taskprocessor_build_name(tps_name, sizeof(tps_name), "stasis/%c:%s",
use_thread_pool ? 'p' : 'm',
stasis_topic_name(topic));
@ -2593,7 +2593,7 @@ int stasis_init(void)
threadpool_opts.auto_increment = 1;
threadpool_opts.max_size = cfg->threadpool_options->max_size;
threadpool_opts.idle_timeout = cfg->threadpool_options->idle_timeout_sec;
pool = ast_threadpool_create("stasis-core", NULL, &threadpool_opts);
pool = ast_threadpool_create("stasis", NULL, &threadpool_opts);
ao2_ref(cfg, -1);
if (!pool) {
ast_log(LOG_ERROR, "Failed to create 'stasis-core' threadpool\n");

View File

@ -89,7 +89,11 @@ struct ast_taskprocessor {
unsigned int high_water_alert:1;
/*! Indicates if the taskprocessor is currently suspended */
unsigned int suspended:1;
/*! \brief Friendly name of the taskprocessor */
/*! \brief Anything before the first '/' in the name (if there is one) */
char *subsystem;
/*! \brief Friendly name of the taskprocessor.
* Subsystem is appended after the name's NULL terminator.
*/
char name[0];
};
@ -112,6 +116,16 @@ struct ast_taskprocessor_listener {
void *user_data;
};
/*!
* Keep track of which subsystems are in alert
* and how many of their taskprocessors are overloaded.
*/
struct subsystem_alert {
unsigned int alert_count;
char subsystem[0];
};
static AST_VECTOR_RW(subsystem_alert_vector, struct subsystem_alert *) overloaded_subsystems;
#ifdef LOW_MEMORY
#define TPS_MAX_BUCKETS 61
#else
@ -138,10 +152,12 @@ static int tps_ping_handler(void *datap);
static char *cli_tps_ping(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_tps_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a);
static struct ast_cli_entry taskprocessor_clis[] = {
AST_CLI_DEFINE(cli_tps_ping, "Ping a named task processor"),
AST_CLI_DEFINE(cli_tps_report, "List instantiated task processors and statistics"),
AST_CLI_DEFINE(cli_subsystem_alert_report, "List task processor subsystems in alert"),
};
struct default_taskprocessor_listener_pvt {
@ -271,6 +287,8 @@ static const struct ast_taskprocessor_listener_callbacks default_listener_callba
static void tps_shutdown(void)
{
ast_cli_unregister_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
AST_VECTOR_CALLBACK_VOID(&overloaded_subsystems, ast_free);
AST_VECTOR_RW_FREE(&overloaded_subsystems);
ao2_t_ref(tps_singletons, -1, "Unref tps_singletons in shutdown");
tps_singletons = NULL;
}
@ -285,6 +303,12 @@ int ast_tps_init(void)
return -1;
}
if (AST_VECTOR_RW_INIT(&overloaded_subsystems, 10)) {
ao2_ref(tps_singletons, -1);
ast_log(LOG_ERROR, "taskprocessor subsystems vector failed to initialize!\n");
return -1;
}
ast_cond_init(&cli_ping_cond, NULL);
ast_cli_register_multiple(taskprocessor_clis, ARRAY_LEN(taskprocessor_clis));
@ -548,6 +572,157 @@ static int tps_cmp_cb(void *obj, void *arg, int flags)
return !strcasecmp(lhs->name, rhsname) ? CMP_MATCH | CMP_STOP : 0;
}
static int subsystem_match(struct subsystem_alert *alert, const char *subsystem)
{
return !strcmp(alert->subsystem, subsystem);
}
static int subsystem_cmp(struct subsystem_alert *a, struct subsystem_alert *b)
{
return strcmp(a->subsystem, b->subsystem);
}
unsigned int ast_taskprocessor_get_subsystem_alert(const char *subsystem)
{
struct subsystem_alert *alert;
unsigned int count = 0;
int idx;
AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
if (idx >= 0) {
alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
count = alert->alert_count;
}
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
return count;
}
static void subsystem_alert_increment(const char *subsystem)
{
struct subsystem_alert *alert;
int idx;
if (ast_strlen_zero(subsystem)) {
return;
}
AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
if (idx >= 0) {
alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
alert->alert_count++;
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
return;
}
alert = ast_malloc(sizeof(*alert) + strlen(subsystem) + 1);
if (!alert) {
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
return;
}
alert->alert_count = 1;
strcpy(alert->subsystem, subsystem); /* Safe */
if (AST_VECTOR_APPEND(&overloaded_subsystems, alert)) {
ast_free(alert);
}
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
}
static void subsystem_alert_decrement(const char *subsystem)
{
struct subsystem_alert *alert;
int idx;
if (ast_strlen_zero(subsystem)) {
return;
}
AST_VECTOR_RW_WRLOCK(&overloaded_subsystems);
idx = AST_VECTOR_GET_INDEX(&overloaded_subsystems, subsystem, subsystem_match);
if (idx < 0) {
ast_log(LOG_ERROR,
"Can't decrement alert count for subsystem '%s' as it wasn't in alert\n", subsystem);
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
return;
}
alert = AST_VECTOR_GET(&overloaded_subsystems, idx);
alert->alert_count--;
if (alert->alert_count <= 0) {
AST_VECTOR_REMOVE(&overloaded_subsystems, idx, 0);
ast_free(alert);
}
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
}
static void subsystem_copy(struct subsystem_alert *alert,
struct subsystem_alert_vector *vector)
{
struct subsystem_alert *alert_copy;
alert_copy = ast_malloc(sizeof(*alert_copy) + strlen(alert->subsystem) + 1);
if (!alert_copy) {
return;
}
alert_copy->alert_count = alert->alert_count;
strcpy(alert_copy->subsystem, alert->subsystem); /* Safe */
if (AST_VECTOR_ADD_SORTED(vector, alert_copy, subsystem_cmp)) {
ast_free(alert_copy);
}
}
static char *cli_subsystem_alert_report(struct ast_cli_entry *e, int cmd, struct ast_cli_args *a)
{
struct subsystem_alert_vector sorted_subsystems;
int i;
#define FMT_HEADERS_SUBSYSTEM "%-32s %12s\n"
#define FMT_FIELDS_SUBSYSTEM "%-32s %12u\n"
switch (cmd) {
case CLI_INIT:
e->command = "core show taskprocessor alerted subsystems";
e->usage =
"Usage: core show taskprocessor alerted subsystems\n"
" Shows a list of task processor subsystems that are currently alerted\n";
return NULL;
case CLI_GENERATE:
return NULL;
}
if (a->argc != e->args) {
return CLI_SHOWUSAGE;
}
if (AST_VECTOR_INIT(&sorted_subsystems, AST_VECTOR_SIZE(&overloaded_subsystems))) {
return CLI_FAILURE;
}
AST_VECTOR_RW_RDLOCK(&overloaded_subsystems);
for (i = 0; i < AST_VECTOR_SIZE(&overloaded_subsystems); i++) {
subsystem_copy(AST_VECTOR_GET(&overloaded_subsystems, i), &sorted_subsystems);
}
AST_VECTOR_RW_UNLOCK(&overloaded_subsystems);
ast_cli(a->fd, "\n" FMT_HEADERS_SUBSYSTEM, "Subsystem", "Alert Count");
for (i = 0; i < AST_VECTOR_SIZE(&sorted_subsystems); i++) {
struct subsystem_alert *alert = AST_VECTOR_GET(&sorted_subsystems, i);
ast_cli(a->fd, FMT_FIELDS_SUBSYSTEM, alert->subsystem, alert->alert_count);
}
ast_cli(a->fd, "\n%lu subsystems\n\n", AST_VECTOR_SIZE(&sorted_subsystems));
AST_VECTOR_CALLBACK_VOID(&sorted_subsystems, ast_free);
AST_VECTOR_FREE(&sorted_subsystems);
return CLI_SUCCESS;
}
/*! Count of the number of taskprocessors in high water alert. */
static unsigned int tps_alert_count;
@ -577,6 +752,15 @@ static void tps_alert_add(struct ast_taskprocessor *tps, int delta)
ast_log(LOG_DEBUG, "Taskprocessor '%s' %s the high water alert.\n",
tps->name, tps_alert_count ? "triggered" : "cleared");
}
if (tps->subsystem[0] != '\0') {
if (delta > 0) {
subsystem_alert_increment(tps->subsystem);
} else {
subsystem_alert_decrement(tps->subsystem);
}
}
ast_rwlock_unlock(&tps_alert_lock);
}
@ -747,8 +931,17 @@ static void *default_listener_pvt_alloc(void)
static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, struct ast_taskprocessor_listener *listener)
{
struct ast_taskprocessor *p;
char *subsystem_separator;
size_t subsystem_length = 0;
size_t name_length;
p = ao2_alloc(sizeof(*p) + strlen(name) + 1, tps_taskprocessor_dtor);
name_length = strlen(name);
subsystem_separator = strchr(name, '/');
if (subsystem_separator) {
subsystem_length = subsystem_separator - name;
}
p = ao2_alloc(sizeof(*p) + name_length + subsystem_length + 2, tps_taskprocessor_dtor);
if (!p) {
ast_log(LOG_WARNING, "failed to create taskprocessor '%s'\n", name);
return NULL;
@ -758,7 +951,9 @@ static struct ast_taskprocessor *__allocate_taskprocessor(const char *name, stru
p->tps_queue_low = (AST_TASKPROCESSOR_HIGH_WATER_LEVEL * 9) / 10;
p->tps_queue_high = AST_TASKPROCESSOR_HIGH_WATER_LEVEL;
strcpy(p->name, name); /*SAFE*/
strcpy(p->name, name); /* Safe */
p->subsystem = p->name + name_length + 1;
ast_copy_string(p->subsystem, name, subsystem_length + 1);
ao2_ref(listener, +1);
p->listener = listener;

View File

@ -413,7 +413,7 @@ static struct ast_threadpool *threadpool_alloc(const char *name, const struct as
return NULL;
}
ast_str_set(&control_tps_name, 0, "%s-control", name);
ast_str_set(&control_tps_name, 0, "%s/pool-control", name);
pool->control_tps = ast_taskprocessor_get(ast_str_buffer(control_tps_name), TPS_REF_DEFAULT);
ast_free(control_tps_name);
@ -919,6 +919,7 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
struct ast_taskprocessor *tps;
RAII_VAR(struct ast_taskprocessor_listener *, tps_listener, NULL, ao2_cleanup);
RAII_VAR(struct ast_threadpool *, pool, NULL, ao2_cleanup);
char *fullname;
pool = threadpool_alloc(name, options);
if (!pool) {
@ -935,7 +936,9 @@ struct ast_threadpool *ast_threadpool_create(const char *name,
return NULL;
}
tps = ast_taskprocessor_create_with_listener(name, tps_listener);
fullname = ast_alloca(strlen(name) + strlen("/pool") + 1);
sprintf(fullname, "%s/pool", name); /* Safe */
tps = ast_taskprocessor_create_with_listener(fullname, tps_listener);
if (!tps) {
return NULL;
}

View File

@ -1897,6 +1897,26 @@
<configOption name="send_contact_status_on_update_registration" default="no">
<synopsis>Enable sending AMI ContactStatus event when a device refreshes its registration.</synopsis>
</configOption>
<configOption name="taskprocessor_overload_trigger">
<synopsis>Trigger scope for taskprocessor overloads</synopsis>
<description><para>
This option specifies the trigger the distributor will use for
detecting taskprocessor overloads. When it detects an overload condition,
the distrubutor will stop accepting new requests until the overload is
cleared.
</para>
<enumlist>
<enum name="global"><para>(default) Any taskprocessor overload will trigger.</para></enum>
<enum name="pjsip_only"><para>Only pjsip taskprocessor overloads will trigger.</para></enum>
<enum name="none"><para>No overload detection will be performed.</para></enum>
</enumlist>
<warning><para>
The "none" and "pjsip_only" options should be used
with extreme caution and only to mitigate specific issues.
Under certain conditions they could make things worse.
</para></warning>
</description>
</configOption>
</configObject>
</configFile>
</configInfo>
@ -5236,7 +5256,7 @@ static int load_module(void)
/* The serializer needs threadpool and threadpool needs pjproject to be initialized so it's next */
sip_get_threadpool_options(&options);
options.thread_start = sip_thread_start;
sip_threadpool = ast_threadpool_create("SIP", NULL, &options);
sip_threadpool = ast_threadpool_create("pjsip", NULL, &options);
if (!sip_threadpool) {
goto error;
}

View File

@ -51,6 +51,7 @@
#define DEFAULT_IGNORE_URI_USER_OPTIONS 0
#define DEFAULT_USE_CALLERID_CONTACT 0
#define DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION 0
#define DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL
/*!
* \brief Cached global config object
@ -110,6 +111,8 @@ struct global_config {
unsigned int use_callerid_contact;
/*! Nonzero if need to send AMI ContactStatus event when a contact is updated */
unsigned int send_contact_status_on_update_registration;
/*! Trigger the distributor should use to pause accepting new dialogs */
enum ast_sip_taskprocessor_overload_trigger overload_trigger;
};
static void global_destructor(void *obj)
@ -483,6 +486,58 @@ unsigned int ast_sip_get_send_contact_status_on_update_registration(void)
return send_contact_status_on_update_registration;
}
enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void)
{
enum ast_sip_taskprocessor_overload_trigger trigger;
struct global_config *cfg;
cfg = get_global_cfg();
if (!cfg) {
return DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER;
}
trigger = cfg->overload_trigger;
ao2_ref(cfg, -1);
return trigger;
}
static int overload_trigger_handler(const struct aco_option *opt,
struct ast_variable *var, void *obj)
{
struct global_config *cfg = obj;
if (!strcasecmp(var->value, "none")) {
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_NONE;
} else if (!strcasecmp(var->value, "global")) {
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL;
} else if (!strcasecmp(var->value, "pjsip_only")) {
cfg->overload_trigger = TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY;
} else {
ast_log(LOG_WARNING, "Unknown overload trigger '%s' specified for %s\n",
var->value, var->name);
return -1;
}
return 0;
}
static const char *overload_trigger_map[] = {
[TASKPROCESSOR_OVERLOAD_TRIGGER_NONE] = "none",
[TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL] = "global",
[TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY] = "pjsip_only"
};
const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger)
{
return ARRAY_IN_BOUNDS(trigger, overload_trigger_map) ?
overload_trigger_map[trigger] : "";
}
static int overload_trigger_to_str(const void *obj, const intptr_t *args, char **buf)
{
const struct global_config *cfg = obj;
*buf = ast_strdup(ast_sip_overload_trigger_to_str(cfg->overload_trigger));
return 0;
}
/*!
* \internal
* \brief Observer to set default global object if none exist.
@ -646,6 +701,9 @@ int ast_sip_initialize_sorcery_global(void)
ast_sorcery_object_field_register(sorcery, "global", "send_contact_status_on_update_registration",
DEFAULT_SEND_CONTACT_STATUS_ON_UPDATE_REGISTRATION ? "yes" : "no",
OPT_YESNO_T, 1, FLDSET(struct global_config, send_contact_status_on_update_registration));
ast_sorcery_object_field_register_custom(sorcery, "global", "taskprocessor_overload_trigger",
overload_trigger_map[DEFAULT_TASKPROCESSOR_OVERLOAD_TRIGGER],
overload_trigger_handler, overload_trigger_to_str, NULL, 0, 0);
if (ast_sorcery_instance_observer_add(sorcery, &observer_callbacks_global)) {
return -1;

View File

@ -408,4 +408,14 @@ void ast_sip_destroy_transport_management(void);
*/
int ast_sip_persistent_endpoint_add_to_regcontext(const char *regcontext);
enum ast_sip_taskprocessor_overload_trigger {
TASKPROCESSOR_OVERLOAD_TRIGGER_NONE = 0,
TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL,
TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY
};
enum ast_sip_taskprocessor_overload_trigger ast_sip_get_taskprocessor_overload_trigger(void);
const char *ast_sip_overload_trigger_to_str(enum ast_sip_taskprocessor_overload_trigger trigger);
#endif /* RES_PJSIP_PRIVATE_H_ */

View File

@ -51,6 +51,7 @@ static unsigned int unidentified_count;
static unsigned int unidentified_period;
static unsigned int unidentified_prune_interval;
static int using_auth_username;
static enum ast_sip_taskprocessor_overload_trigger overload_trigger;
struct unidentified_request{
struct timeval first_seen;
@ -534,7 +535,10 @@ static pj_bool_t distributor(pjsip_rx_data *rdata)
ao2_cleanup(dist);
return PJ_TRUE;
} else {
if (ast_taskprocessor_alert_get()) {
if ((overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_GLOBAL &&
ast_taskprocessor_alert_get())
|| (overload_trigger == TASKPROCESSOR_OVERLOAD_TRIGGER_PJSIP_ONLY &&
ast_taskprocessor_get_subsystem_alert("pjsip"))) {
/*
* When taskprocessors get backed up, there is a good chance that
* we are being overloaded and need to defer adding new work to
@ -1196,6 +1200,8 @@ static void global_loaded(const char *object_type)
ast_sip_get_unidentified_request_thresholds(&unidentified_count, &unidentified_period, &unidentified_prune_interval);
overload_trigger = ast_sip_get_taskprocessor_overload_trigger();
/* Clean out the old task, if any */
ast_sched_clean_by_callback(prune_context, prune_task, clean_task);
/* Have to do something with the return value to shut up the stupid compiler. */

View File

@ -46,6 +46,8 @@ struct task_data {
ast_mutex_t lock;
/*! Boolean indicating that the task was run */
int task_complete;
/*! Milliseconds to wait before returning */
unsigned long wait_time;
};
static void task_data_dtor(void *obj)
@ -69,6 +71,7 @@ static struct task_data *task_data_create(void)
ast_cond_init(&task_data->cond, NULL);
ast_mutex_init(&task_data->lock);
task_data->task_complete = 0;
task_data->wait_time = 0;
return task_data;
}
@ -83,7 +86,11 @@ static struct task_data *task_data_create(void)
static int task(void *data)
{
struct task_data *task_data = data;
SCOPED_MUTEX(lock, &task_data->lock);
if (task_data->wait_time > 0) {
usleep(task_data->wait_time * 1000);
}
task_data->task_complete = 1;
ast_cond_signal(&task_data->cond);
return 0;
@ -165,6 +172,143 @@ AST_TEST_DEFINE(default_taskprocessor)
return AST_TEST_PASS;
}
/*!
* \brief Baseline test for subsystem alert
*/
AST_TEST_DEFINE(subsystem_alert)
{
RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
#define TEST_DATA_ARRAY_SIZE 10
#define LOW_WATER_MARK 3
#define HIGH_WATER_MARK 6
struct task_data *task_data[(TEST_DATA_ARRAY_SIZE + 1)] = { 0 };
int res;
int i;
long queue_count;
unsigned int alert_level;
unsigned int subsystem_alert_level;
switch (cmd) {
case TEST_INIT:
info->name = "subsystem_alert";
info->category = "/main/taskprocessor/";
info->summary = "Test of subsystem alerts";
info->description =
"Ensures alerts are generated properly.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
tps = ast_taskprocessor_get("test_subsystem/test", TPS_REF_DEFAULT);
if (!tps) {
ast_test_status_update(test, "Unable to create test taskprocessor\n");
return AST_TEST_FAIL;
}
ast_taskprocessor_alert_set_levels(tps, LOW_WATER_MARK, HIGH_WATER_MARK);
ast_taskprocessor_suspend(tps);
for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
task_data[i] = task_data_create();
if (!task_data[i]) {
ast_test_status_update(test, "Unable to create task_data\n");
res = -1;
goto data_cleanup;
}
task_data[i]->wait_time = 500;
ast_test_status_update(test, "Pushing task %d\n", i);
if (ast_taskprocessor_push(tps, task, task_data[i])) {
ast_test_status_update(test, "Failed to queue task\n");
res = -1;
goto data_cleanup;
}
queue_count = ast_taskprocessor_size(tps);
alert_level = ast_taskprocessor_alert_get();
subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
if (queue_count == HIGH_WATER_MARK) {
if (subsystem_alert_level) {
ast_test_status_update(test, "Subsystem alert triggered correctly at %ld\n", queue_count);
}
if (alert_level) {
ast_test_status_update(test, "Global alert triggered correctly at %ld\n", queue_count);
}
} else if (queue_count < HIGH_WATER_MARK) {
if (subsystem_alert_level > 0) {
ast_test_status_update(test, "Subsystem alert triggered unexpectedly at %ld\n", queue_count);
res = -1;
}
if (alert_level > 0) {
ast_test_status_update(test, "Global alert triggered unexpectedly at %ld\n", queue_count);
res = -1;
}
} else {
if (subsystem_alert_level == 0) {
ast_test_status_update(test, "Subsystem alert failed to trigger at %ld\n", queue_count);
res = -1;
}
if (alert_level == 0) {
ast_test_status_update(test, "Global alert failed to trigger at %ld\n", queue_count);
res = -1;
}
}
}
ast_taskprocessor_unsuspend(tps);
for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
ast_test_status_update(test, "Waiting on task %d\n", i);
if (task_wait(task_data[i])) {
ast_test_status_update(test, "Queued task '%d' did not execute!\n", i);
res = -1;
goto data_cleanup;
}
queue_count = ast_taskprocessor_size(tps);
alert_level = ast_taskprocessor_alert_get();
subsystem_alert_level = ast_taskprocessor_get_subsystem_alert("test_subsystem");
if (queue_count == LOW_WATER_MARK) {
if (!subsystem_alert_level) {
ast_test_status_update(test, "Subsystem alert cleared correctly at %ld\n", queue_count);
}
if (!alert_level) {
ast_test_status_update(test, "Global alert cleared correctly at %ld\n", queue_count);
}
} else if (queue_count > LOW_WATER_MARK) {
if (subsystem_alert_level == 0) {
ast_test_status_update(test, "Subsystem alert cleared unexpectedly at %ld\n", queue_count);
res = -1;
}
if (alert_level == 0) {
ast_test_status_update(test, "Global alert cleared unexpectedly at %ld\n", queue_count);
res = -1;
}
} else {
if (subsystem_alert_level > 0) {
ast_test_status_update(test, "Subsystem alert failed to clear at %ld\n", queue_count);
res = -1;
}
if (alert_level > 0) {
ast_test_status_update(test, "Global alert failed to clear at %ld\n", queue_count);
res = -1;
}
}
}
data_cleanup:
for (i = 1; i <= TEST_DATA_ARRAY_SIZE; i++) {
ao2_cleanup(task_data[i]);
}
return res ? AST_TEST_FAIL : AST_TEST_PASS;
}
#define NUM_TASKS 20000
/*!
@ -749,6 +893,7 @@ static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
ast_test_unregister(subsystem_alert);
ast_test_unregister(taskprocessor_listener);
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
@ -759,6 +904,7 @@ static int load_module(void)
{
ast_test_register(default_taskprocessor);
ast_test_register(default_taskprocessor_load);
ast_test_register(subsystem_alert);
ast_test_register(taskprocessor_listener);
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);