Multiple revisions 399887,400138,400178,400180-400181

........
  r399887 | dlee | 2013-09-26 10:41:47 -0500 (Thu, 26 Sep 2013) | 1 line
  
  Minor performance bump by not allocate manager variable struct if we don't need it
........
  r400138 | dlee | 2013-09-30 10:24:00 -0500 (Mon, 30 Sep 2013) | 23 lines
  
  Stasis performance improvements
  
  This patch addresses several performance problems that were found in
  the initial performance testing of Asterisk 12.
  
  The Stasis dispatch object was allocated as an AO2 object, even though
  it has a very confined lifecycle. This was replaced with a straight
  ast_malloc().
  
  The Stasis message router was spending an inordinate amount of time
  searching hash tables. In this case, most of our routers had 6 or
  fewer routes in them to begin with. This was replaced with an array
  that's searched linearly for the route.
  
  We more heavily rely on AO2 objects in Asterisk 12, and the memset()
  in ao2_ref() actually became noticeable on the profile. This was
  #ifdef'ed to only run when AO2_DEBUG was enabled.
  
  After being misled by an erroneous comment in taskprocessor.c during
  profiling, the wrong comment was removed.
  
  Review: https://reviewboard.asterisk.org/r/2873/
........
  r400178 | dlee | 2013-09-30 13:26:27 -0500 (Mon, 30 Sep 2013) | 24 lines
  
  Taskprocessor optimization; switch Stasis to use taskprocessors
  
  This patch optimizes taskprocessor to use a semaphore for signaling,
  which the OS can do a better job at managing contention and waiting
  that we can with a mutex and condition.
  
  The taskprocessor execution was also slightly optimized to reduce the
  number of locks taken.
  
  The only observable difference in the taskprocessor implementation is
  that when the final reference to the taskprocessor goes away, it will
  execute all tasks to completion instead of discarding the unexecuted
  tasks.
  
  For systems where unnamed semaphores are not supported, a really
  simple semaphore implementation is provided. (Which gives identical
  performance as the original taskprocessor implementation).
  
  The way we ended up implementing Stasis caused the threadpool to be a
  burden instead of a boost to performance. This was switched to just
  use taskprocessors directly for subscriptions.
  
  Review: https://reviewboard.asterisk.org/r/2881/
........
  r400180 | dlee | 2013-09-30 13:39:34 -0500 (Mon, 30 Sep 2013) | 28 lines
  
  Optimize how Stasis forwards are dispatched
  
  This patch optimizes how forwards are dispatched in Stasis.
  
  Originally, forwards were dispatched as subscriptions that are invoked
  on the publishing thread. This did not account for the vast number of
  forwards we would end up having in the system, and the amount of work it
  would take to walk though the forward subscriptions.
  
  This patch modifies Stasis so that rather than walking the tree of
  forwards on every dispatch, when forwards and subscriptions are changed,
  the subscriber list for every topic in the tree is changed.
  
  This has a couple of benefits. First, this reduces the workload of
  dispatching messages. It also reduces contention when dispatching to
  different topics that happen to forward to the same aggregation topic
  (as happens with all of the channel, bridge and endpoint topics).
  
  Since forwards are no longer subscriptions, the bulk of this patch is
  simply changing stasis_subscription objects to stasis_forward objects
  (which, admittedly, I should have done in the first place.)
  
  Since this required me to yet again put in a growing array, I finally
  abstracted that out into a set of ast_vector macros in
  asterisk/vector.h.
  
  Review: https://reviewboard.asterisk.org/r/2883/
........
  r400181 | dlee | 2013-09-30 13:48:57 -0500 (Mon, 30 Sep 2013) | 28 lines
  
  Remove dispatch object allocation from Stasis publishing
  
  While looking for areas for performance improvement, I realized that an
  unused feature in Stasis was negatively impacting performance.
  
  When a message is sent to a subscriber, a dispatch object is allocated
  for the dispatch, containing the topic the message was published to, the
  subscriber the message is being sent to, and the message itself.
  
  The topic is actually unused by any subscriber in Asterisk today. And
  the subscriber is associated with the taskprocessor the message is being
  dispatched to.
  
  First, this patch removes the unused topic parameter from Stasis
  subscription callbacks.
  
  Second, this patch introduces the concept of taskprocessor local data,
  data that may be set on a taskprocessor and provided along with the data
  pointer when a task is pushed using the ast_taskprocessor_push_local()
  call. This allows the task to have both data specific to that
  taskprocessor, in addition to data specific to that invocation.
  
  With those two changes, the dispatch object can be removed completely,
  and the message is simply refcounted and sent directly to the
  taskprocessor.
  
  Review: https://reviewboard.asterisk.org/r/2884/
........

Merged revisions 399887,400138,400178,400180-400181 from http://svn.asterisk.org/svn/asterisk/branches/12


git-svn-id: https://origsvn.digium.com/svn/asterisk/trunk@400186 65c4cc65-6c06-0410-ace0-fbb531ad65f3
This commit is contained in:
David M. Lee 2013-09-30 18:55:27 +00:00
parent db7c8691a0
commit 2de42c2a25
62 changed files with 1630 additions and 1174 deletions

View File

@ -1139,7 +1139,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talking_type);
STASIS_MESSAGE_TYPE_DEFN_LOCAL(meetme_talk_request_type);
static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message);
struct stasis_message *message);
static void meetme_stasis_cleanup(void)
{
@ -1226,7 +1226,7 @@ static int meetme_stasis_init(void)
}
static void meetme_stasis_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *channel_blob = stasis_message_data(message);
struct stasis_message_type *message_type;

View File

@ -1832,7 +1832,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_dump_type);
STASIS_MESSAGE_TYPE_DEFN_LOCAL(queue_agent_ringnoanswer_type);
static void queue_channel_manager_event(void *data,
struct stasis_subscription *sub, struct stasis_topic *topic,
struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;
@ -1858,7 +1858,7 @@ static void queue_channel_manager_event(void *data,
}
static void queue_multi_channel_manager_event(void *data,
struct stasis_subscription *sub, struct stasis_topic *topic,
struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;
@ -1902,7 +1902,7 @@ static void queue_multi_channel_manager_event(void *data,
}
static void queue_member_manager_event(void *data,
struct stasis_subscription *sub, struct stasis_topic *topic,
struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;
@ -2140,7 +2140,7 @@ static int is_member_available(struct call_queue *q, struct member *mem)
}
/*! \brief set a member's status based on device state of that member's interface*/
static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ao2_iterator miter, qiter;
struct ast_device_state_message *dev_state;
@ -5185,7 +5185,7 @@ static void send_agent_complete(const char *queuename, struct ast_channel_snapsh
}
static void queue_agent_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct ast_channel_blob *agent_blob;
@ -5401,7 +5401,7 @@ static void log_attended_transfer(struct queue_stasis_data *queue_data, struct a
* \param msg The stasis message for the bridge enter event
*/
static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_bridge_blob *enter_blob = stasis_message_data(msg);
@ -5434,7 +5434,7 @@ static void handle_bridge_enter(void *userdata, struct stasis_subscription *sub,
* \param msg The stasis message for the blind transfer event
*/
static void handle_blind_transfer(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_bridge_blob *blind_blob = stasis_message_data(msg);
@ -5503,7 +5503,7 @@ static void handle_blind_transfer(void *userdata, struct stasis_subscription *su
* \param msg The stasis message for the attended transfer event.
*/
static void handle_attended_transfer(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_attended_transfer_message *atxfer_msg = stasis_message_data(msg);
@ -5558,7 +5558,7 @@ static void handle_attended_transfer(void *userdata, struct stasis_subscription
* subroutines for further processing.
*/
static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
if (stasis_subscription_final_message(sub, msg)) {
ao2_cleanup(userdata);
@ -5578,7 +5578,7 @@ static void queue_bridge_cb(void *userdata, struct stasis_subscription *sub,
* \param msg The stasis message for the local optimization begin event
*/
static void handle_local_optimization_begin(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
@ -5630,7 +5630,7 @@ static void handle_local_optimization_begin(void *userdata, struct stasis_subscr
* \param msg The stasis message for the local optimization end event
*/
static void handle_local_optimization_end(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_multi_channel_blob *optimization_blob = stasis_message_data(msg);
@ -5695,7 +5695,7 @@ static void handle_local_optimization_end(void *userdata, struct stasis_subscrip
* \param msg The stasis message for the hangup event.
*/
static void handle_hangup(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct queue_stasis_data *queue_data = userdata;
struct ast_channel_blob *channel_blob = stasis_message_data(msg);
@ -5756,7 +5756,7 @@ static void handle_hangup(void *userdata, struct stasis_subscription *sub,
* subroutines for further processing.
*/
static void queue_channel_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
if (stasis_subscription_final_message(sub, msg)) {
ao2_cleanup(userdata);
@ -10336,7 +10336,7 @@ static const struct ast_data_entry queue_data_providers[] = {
};
static struct stasis_message_router *agent_router;
static struct stasis_subscription *topic_forwarder;
static struct stasis_forward *topic_forwarder;
static int unload_module(void)
{
@ -10364,7 +10364,7 @@ static int unload_module(void)
stasis_message_router_remove(message_router, queue_agent_ringnoanswer_type());
}
stasis_message_router_unsubscribe_and_join(agent_router);
topic_forwarder = stasis_unsubscribe(topic_forwarder);
topic_forwarder = stasis_forward_cancel(topic_forwarder);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_join_type);
STASIS_MESSAGE_TYPE_CLEANUP(queue_caller_leave_type);

View File

@ -12606,7 +12606,7 @@ static void mwi_sub_event_cb(struct stasis_subscription_change *change)
}
}
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct stasis_subscription_change *change;
/* Only looking for subscription change notices here */
@ -12629,7 +12629,7 @@ static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct
static int dump_cache(void *obj, void *arg, int flags)
{
struct stasis_message *msg = obj;
mwi_event_cb(NULL, NULL, NULL, msg);
mwi_event_cb(NULL, NULL, msg);
return 0;
}

View File

@ -224,63 +224,54 @@ static void confbridge_publish_manager_event(
}
static void confbridge_start_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeStart", NULL);
}
static void confbridge_end_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeEnd", NULL);
}
static void confbridge_leave_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeLeave", NULL);
}
static void confbridge_join_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeJoin", NULL);
}
static void confbridge_start_record_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeRecord", NULL);
}
static void confbridge_stop_record_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeStopRecord", NULL);
}
static void confbridge_mute_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeMute", NULL);
}
static void confbridge_unmute_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
confbridge_publish_manager_event(message, "ConfbridgeUnmute", NULL);
}
static void confbridge_talking_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, extra_text, NULL, ast_free);

View File

@ -553,7 +553,7 @@ static int restart_monitor(void);
static int dahdi_sendtext(struct ast_channel *c, const char *text);
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* This module does not handle MWI in an event-based manner. However, it
* subscribes to MWI for each mailbox that is configured so that the core

View File

@ -1270,8 +1270,8 @@ static void build_rand_pad(unsigned char *buf, ssize_t len);
static int get_unused_callno(enum callno_type type, int validated, callno_entry *entry);
static int replace_callno(const void *obj);
static void sched_delay_remove(struct sockaddr_in *sin, callno_entry entry);
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static struct ast_channel_tech iax2_tech = {
.type = "IAX2",
@ -1331,7 +1331,7 @@ static void iax2_lock_owner(int callno)
}
}
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* The MWI subscriptions exist just so the core knows we care about those
* mailboxes. However, we just grab the events out of the cache when it
@ -1378,7 +1378,7 @@ static int network_change_sched_cb(const void *data)
}
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
/* This callback is only concerned with network change messages from the system topic. */
if (stasis_message_type(message) != ast_network_change_type()) {
@ -1392,7 +1392,7 @@ static void network_change_stasis_cb(void *data, struct stasis_subscription *sub
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;

View File

@ -486,7 +486,7 @@ static struct ast_channel_tech mgcp_tech = {
.func_channel_read = acf_channel_read,
};
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* This module does not handle MWI in an event-based manner. However, it
* subscribes to MWI for each mailbox that is configured so that the core

View File

@ -1324,9 +1324,9 @@ static int sip_poke_noanswer(const void *data);
static int sip_poke_peer(struct sip_peer *peer, int force);
static void sip_poke_all_peers(void);
static void sip_peer_hold(struct sip_pvt *p, int hold);
static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_topic *, struct stasis_message *);
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
static void mwi_event_cb(void *, struct stasis_subscription *, struct stasis_message *);
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void sip_keepalive_all_peers(void);
/*--- Applications, functions, CLI and manager command helpers */
@ -16825,7 +16825,7 @@ static void sip_peer_hold(struct sip_pvt *p, int hold)
}
/*! \brief Receive MWI events that we have subscribed to */
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct sip_peer *peer = userdata;
if (stasis_subscription_final_message(sub, msg)) {
@ -16872,7 +16872,7 @@ static int network_change_sched_cb(const void *data)
return 0;
}
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void network_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
/* This callback is only concerned with network change messages from the system topic. */
if (stasis_message_type(message) != ast_network_change_type()) {
@ -28940,7 +28940,7 @@ static int restart_monitor(void)
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;

View File

@ -1639,7 +1639,7 @@ static int skinny_indicate(struct ast_channel *ast, int ind, const void *data, s
static int skinny_fixup(struct ast_channel *oldchan, struct ast_channel *newchan);
static int skinny_senddigit_begin(struct ast_channel *ast, char digit);
static int skinny_senddigit_end(struct ast_channel *ast, char digit, unsigned int duration);
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
static int skinny_dialer_cb(const void *data);
static int skinny_reload(void);
@ -2300,7 +2300,7 @@ static int skinny_register(struct skinny_req *req, struct skinnysession *s)
set_callforwards(l, NULL, SKINNY_CFWD_ALL|SKINNY_CFWD_BUSY|SKINNY_CFWD_NOANSWER);
register_exten(l);
/* initialize MWI on line and device */
mwi_event_cb(l, NULL, NULL, NULL);
mwi_event_cb(l, NULL, NULL);
AST_LIST_TRAVERSE(&l->sublines, subline, list) {
ast_extension_state_add(subline->context, subline->exten, skinny_extensionstate_cb, subline->container);
}
@ -3529,7 +3529,7 @@ static void update_connectedline(struct skinny_subchannel *sub, const void *data
send_callinfo(sub);
}
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct skinny_line *l = userdata;
struct skinny_device *d = l->device;

View File

@ -8892,7 +8892,7 @@ static void sig_pri_send_mwi_indication(struct sig_pri_span *pri, const char *vm
*
* \return Nothing
*/
static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void sig_pri_mwi_event_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct sig_pri_span *pri = userdata;
const char *mbox_context;

View File

@ -1,8 +0,0 @@
[threadpool]
;initial_size = 0 ; Initial size of the threadpool
;idle_timeout_sec = 20 ; Number of seconds a thread should be idle before dying
; ; 0 means threads never time out
;max_size = 200 ; Maximum number of threads in the threadpool
; ; 0 means no limit to the threads in the threadpool

567
configure vendored

File diff suppressed because it is too large Load Diff

View File

@ -808,11 +808,12 @@ AC_DEFINE([HAVE_PTHREAD_MUTEX_RECURSIVE_NP], 1, [Define to 1 if your system defi
AC_MSG_RESULT(no)
)
AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
save_LIBS="$LIBS"
save_CFLAGS="$CFLAGS"
LIBS="$PTHREAD_LIBS $LIBS"
CFLAGS="$CFLAGS $PTHREAD_CFLAGS"
AC_MSG_CHECKING(for pthread_rwlock_timedwrlock() in pthread.h)
AC_LINK_IFELSE(
[AC_LANG_PROGRAM(
[#include <pthread.h>
@ -826,6 +827,17 @@ AC_LINK_IFELSE(
ac_cv_pthread_rwlock_timedwrlock="no"
]
)
# Some platforms define sem_init(), but only support sem_open(). joyous.
AC_MSG_CHECKING(for working unnamed semaphores)
AC_RUN_IFELSE(
[AC_LANG_PROGRAM([#include <semaphore.h>],
[sem_t sem; return sem_init(&sem, 0, 0);])],
AC_MSG_RESULT(yes)
AC_DEFINE([HAS_WORKING_SEMAPHORE], 1, [Define to 1 if anonymous semaphores work.]),
AC_MSG_RESULT(no)
)
LIBS="$save_LIBS"
CFLAGS="$save_CFLAGS"
if test "${ac_cv_pthread_rwlock_timedwrlock}" = "yes"; then

View File

@ -649,7 +649,7 @@ struct test_cb_data {
sem_t sem;
};
static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void test_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct test_cb_data *cb_data = userdata;
if (stasis_message_type(msg) != ast_presence_state_message_type()) {

View File

@ -29,6 +29,9 @@
/* Define to 1 if using `alloca.c'. */
#undef C_ALLOCA
/* Define to 1 if anonymous semaphores work. */
#undef HAS_WORKING_SEMAPHORE
/* Define to 1 if you have the `acos' function. */
#undef HAVE_ACOS

157
include/asterisk/sem.h Normal file
View File

@ -0,0 +1,157 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef ASTERISK_SEMAPHORE_H
#define ASTERISK_SEMAPHORE_H
/*!
* \file Asterisk semaphore API
*
* This API is a thin wrapper around the POSIX semaphore API (when available),
* so see the POSIX documentation for further details.
*/
#ifdef HAS_WORKING_SEMAPHORE
/* Working semaphore implementation detected */
#include <semaphore.h>
struct ast_sem {
sem_t real_sem;
};
#define AST_SEM_VALUE_MAX SEM_VALUE_MAX
/* These are thin wrappers; might as well inline them */
static force_inline int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
{
return sem_init(&sem->real_sem, pshared, value);
}
static force_inline int ast_sem_destroy(struct ast_sem *sem)
{
return sem_destroy(&sem->real_sem);
}
static force_inline int ast_sem_post(struct ast_sem *sem)
{
return sem_post(&sem->real_sem);
}
static force_inline int ast_sem_wait(struct ast_sem *sem)
{
return sem_wait(&sem->real_sem);
}
static force_inline int ast_sem_getvalue(struct ast_sem *sem, int *sval)
{
return sem_getvalue(&sem->real_sem, sval);
}
#else
/* Unnamed semaphores don't work. Rolling our own, I guess... */
#include "asterisk/lock.h"
#include <limits.h>
struct ast_sem {
/*! Current count of this semaphore */
int count;
/*! Number of threads currently waiting for this semaphore */
int waiters;
/*! Mutual exclusion */
ast_mutex_t mutex;
/*! Condition for singalling waiters */
ast_cond_t cond;
};
#define AST_SEM_VALUE_MAX INT_MAX
/*!
* \brief Initialize a semaphore.
*
* \param sem Semaphore to initialize.
* \param pshared Pass true (nonzero) to share this thread between processes.
* Not be supported on all platforms, so be wary!
* But leave the parameter, to be compatible with the POSIX ABI
* in case we need to add support in the future.
* \param value Initial value of the semaphore.
*
* \return 0 on success.
* \return -1 on error, errno set to indicate error.
*/
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value);
/*!
* \brief Destroy a semaphore.
*
* Destroying a semaphore that other threads are currently blocked on produces
* undefined behavior.
*
* \param sem Semaphore to destroy.
*
* \return 0 on success.
* \return -1 on error, errno set to indicate error.
*/
int ast_sem_destroy(struct ast_sem *sem);
/*!
* \brief Increments the semaphore, unblocking a waiter if necessary.
*
* \param sem Semaphore to increment.
*
* \return 0 on success.
* \return -1 on error, errno set to indicate error.
*/
int ast_sem_post(struct ast_sem *sem);
/*!
* \brief Decrements the semaphore.
*
* If the semaphore's current value is zero, this function blocks until another
* thread posts (ast_sem_post()) to the semaphore (or is interrupted by a signal
* handler, which sets errno to EINTR).
*
* \param sem Semaphore to decrement.
*
* \return 0 on success.
* \return -1 on error, errno set to indicate error.
*/
int ast_sem_wait(struct ast_sem *sem);
/*!
* \brief Gets the current value of the semaphore.
*
* If threads are blocked on this semaphore, POSIX allows the return value to be
* either 0 or a negative number whose absolute value is the number of threads
* blocked. Don't assume that it will give you one or the other; Asterisk has
* been ported to just about everything.
*
* \param sem Semaphore to query.
* \param[out] sval Output value.
*
* \return 0 on success.
* \return -1 on error, errno set to indicate error.
*/
int ast_sem_getvalue(struct ast_sem *sem, int *sval);
#endif
#endif /* ASTERISK_SEMAPHORE_H */

View File

@ -347,18 +347,6 @@ const char *stasis_topic_name(const struct stasis_topic *topic);
*/
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message);
/*!
* \brief Publish a message from a specified topic to all the subscribers of a
* possibly different topic.
* \param topic Topic to publish message to.
* \param topic Original topic message was from.
* \param message Message
* \since 12
*/
void stasis_forward_message(struct stasis_topic *topic,
struct stasis_topic *publisher_topic,
struct stasis_message *message);
/*!
* \brief Wait for all pending messages on a given topic to be processed.
* \param topic Topic to await pending messages on.
@ -381,11 +369,10 @@ struct stasis_subscription;
/*!
* \brief Callback function type for Stasis subscriptions.
* \param data Data field provided with subscription.
* \param topic Topic to which the message was published.
* \param message Published message.
* \since 12
*/
typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
typedef void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message);
/*!
* \brief Create a subscription.
@ -464,6 +451,8 @@ int stasis_subscription_is_done(struct stasis_subscription *subscription);
struct stasis_subscription *stasis_unsubscribe_and_join(
struct stasis_subscription *subscription);
struct stasis_forward;
/*!
* \brief Create a subscription which forwards all messages from one topic to
* another.
@ -477,9 +466,11 @@ struct stasis_subscription *stasis_unsubscribe_and_join(
* \return \c NULL on error.
* \since 12
*/
struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic);
struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward);
/*!
* \brief Get the unique ID for the subscription.
*
@ -579,8 +570,6 @@ struct stasis_message_type *stasis_cache_update_type(void);
* \since 12
*/
struct stasis_cache_update {
/*! \brief Topic that published \c new_snapshot */
struct stasis_topic *topic;
/*! \brief Convenience reference to snapshot type */
struct stasis_message_type *type;
/*! \brief Old value from the cache */
@ -884,16 +873,6 @@ int stasis_config_init(void);
*/
int stasis_wait_init(void);
struct ast_threadpool_options;
/*!
* \internal
* \brief Retrieves the Stasis threadpool configuration.
* \param[out] threadpool_options Filled with Stasis threadpool options.
*/
void stasis_config_get_threadpool_options(
struct ast_threadpool_options *threadpool_options);
/*! @} */
/*!

View File

@ -62,7 +62,7 @@ struct stasis_message;
*/
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message),
void (*stasis_subscription_cb)(void *data, struct stasis_subscription *sub, struct stasis_message *message),
void *data,
int needs_mailbox);

View File

@ -100,6 +100,9 @@ int stasis_message_router_is_done(struct stasis_message_router *router);
* updates for types not handled by routes added with
* stasis_message_router_add_cache_update().
*
* Adding multiple routes for the same message type results in undefined
* behavior.
*
* \param router Router to add the route to.
* \param message_type Type of message to route.
* \param callback Callback to forard messages of \a message_type to.
@ -121,6 +124,9 @@ int stasis_message_router_add(struct stasis_message_router *router,
* These are distinct from regular routes, so one could have both a regular
* route and a cache route for the same \a message_type.
*
* Adding multiple routes for the same message type results in undefined
* behavior.
*
* \param router Router to add the route to.
* \param message_type Subtype of cache update to route.
* \param callback Callback to forard messages of \a message_type to.
@ -138,6 +144,11 @@ int stasis_message_router_add_cache_update(struct stasis_message_router *router,
/*!
* \brief Remove a route from a message router.
*
* If a route is removed from another thread, there is no notification that
* all messages using this route have been processed. This typically means that
* the associated \c data pointer for this route must be kept until the
* route itself is disposed of.
*
* \param router Router to remove the route from.
* \param message_type Type of message to route.
*
@ -149,6 +160,11 @@ void stasis_message_router_remove(struct stasis_message_router *router,
/*!
* \brief Remove a cache route from a message router.
*
* If a route is removed from another thread, there is no notification that
* all messages using this route have been processed. This typically means that
* the associated \c data pointer for this route must be kept until the
* route itself is disposed of.
*
* \param router Router to remove the route from.
* \param message_type Type of message to route.
*

View File

@ -109,6 +109,7 @@ struct ast_taskprocessor_listener_callbacks {
* \param listener The listener
*/
void (*shutdown)(struct ast_taskprocessor_listener *listener);
void (*dtor)(struct ast_taskprocessor_listener *listener);
};
/*!
@ -174,6 +175,18 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
*/
struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *name, struct ast_taskprocessor_listener *listener);
/*!
* \brief Sets the local data associated with a taskprocessor.
*
* \since 12.0.0
*
* See ast_taskprocessor_push_local().
*
* \param tps Task processor.
* \param local_data Local data to associate with \a tps.
*/
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps, void *local_data);
/*!
* \brief Unreference the specified taskprocessor and its reference count will decrement.
*
@ -196,6 +209,32 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps);
*/
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap);
/*! \brief Local data parameter */
struct ast_taskprocessor_local {
/*! Local data, associated with the taskprocessor. */
void *local_data;
/*! Data pointer passed with this task. */
void *data;
};
/*!
* \brief Push a task into the specified taskprocessor queue and signal the
* taskprocessor thread.
*
* The callback receives a \ref ast_taskprocessor_local struct, which contains
* both the provided \a datap pointer, and any local data set on the
* taskprocessor with ast_taskprocessor_set_local().
*
* \param tps The taskprocessor structure
* \param task_exe The task handling function to push into the taskprocessor queue
* \param datap The data to be used by the task handling function
* \retval 0 success
* \retval -1 failure
* \since 12.0.0
*/
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps,
int (*task_exe)(struct ast_taskprocessor_local *local), void *datap);
/*!
* \brief Pop a task off the taskprocessor and execute it.
*

193
include/asterisk/vector.h Normal file
View File

@ -0,0 +1,193 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
#ifndef _ASTERISK_VECTOR_H
#define _ASTERISK_VECTOR_H
/*! \file
*
* \brief Vector container support.
*
* A vector is a variable length array, with properties that can be useful when
* order doesn't matter.
* - Appends are asymptotically constant time.
* - Unordered removes are constant time.
* - Search is linear time
*
* \author David M. Lee, II <dlee@digium.com>
* \since 12
*/
/*! \brief Define a vector structure */
#define ast_vector(type) \
struct { \
type *elems; \
size_t max; \
size_t current; \
}
/*!
* \brief Initialize a vector
*
* If \a size is 0, then no space will be allocated until the vector is
* appended to.
*
* \param vec Vector to initialize.
* \param size Initial size of the vector.
*
* \return 0 on success.
* \return Non-zero on failure.
*/
#define ast_vector_init(vec, size) ({ \
size_t __size = (size); \
size_t alloc_size = __size * sizeof(*(vec).elems); \
(vec).elems = alloc_size ? ast_malloc(alloc_size) : NULL; \
(vec).current = 0; \
if ((vec).elems) { \
(vec).max = __size; \
} else { \
(vec).max = 0; \
} \
alloc_size == 0 || (vec).elems != NULL ? 0 : -1; \
})
/*!
* \brief Deallocates this vector.
*
* If any code to free the elements of this vector need to be run, that should
* be done prior to this call.
*
* \param vec Vector to deallocate.
*/
#define ast_vector_free(vec) do { \
ast_free((vec).elems); \
(vec).elems = NULL; \
(vec).max = 0; \
(vec).current = 0; \
} while (0)
/*!
* \brief Append an element to a vector, growing the vector if needed.
*
* \param vec Vector to append to.
* \param elem Element to append.
*
* \return 0 on success.
* \return Non-zero on failure.
*/
#define ast_vector_append(vec, elem) ({ \
int res = 0; \
\
if ((vec).current + 1 > (vec).max) { \
size_t new_max = (vec).max ? 2 * (vec).max : 1; \
typeof((vec).elems) new_elems = ast_realloc( \
(vec).elems, new_max * sizeof(*new_elems)); \
if (new_elems) { \
(vec).elems = new_elems; \
(vec).max = new_max; \
} else { \
res = -1; \
} \
} \
\
if (res == 0) { \
(vec).elems[(vec).current++] = (elem); \
} \
res; \
})
/*!
* \brief Remove an element from a vector by index.
*
* Note that elements in the vector may be reordered, so that the remove can
* happen in constant time.
*
* \param vec Vector to remove from.
* \param idx Index of the element to remove.
* \return The element that was removed.
*/
#define ast_vector_remove_unordered(vec, idx) ({ \
typeof((vec).elems[0]) res; \
size_t __idx = (idx); \
ast_assert(__idx < (vec).current); \
res = (vec).elems[__idx]; \
(vec).elems[__idx] = (vec).elems[--(vec).current]; \
res; \
})
/*!
* \brief Remove an element from a vector that matches the given comparison
*
* \param vec Vector to remove from.
* \param value Value to pass into comparator.
* \param cmp Comparator function/macros (called as \c cmp(elem, value))
* \return 0 if element was removed.
* \return Non-zero if element was not in the vector.
*/
#define ast_vector_remove_cmp_unordered(vec, value, cmp) ({ \
int res = -1; \
size_t idx; \
typeof(value) __value = (value); \
for (idx = 0; idx < (vec).current; ++idx) { \
if (cmp((vec).elems[idx], __value)) { \
ast_vector_remove_unordered((vec), idx); \
res = 0; \
break; \
} \
} \
res; \
})
/*! \brief Default comparator for ast_vector_remove_elem_unordered() */
#define AST_VECTOR_DEFAULT_CMP(a, b) ((a) == (b))
/*!
* \brief Remove an element from a vector.
*
* \param vec Vector to remove from.
* \param elem Element to remove
* \return 0 if element was removed.
* \return Non-zero if element was not in the vector.
*/
#define ast_vector_remove_elem_unordered(vec, elem) ({ \
ast_vector_remove_cmp_unordered((vec), (elem), \
AST_VECTOR_DEFAULT_CMP); \
})
/*!
* \brief Get the number of elements in a vector.
*
* \param vec Vector to query.
* \return Number of elements in the vector.
*/
#define ast_vector_size(vec) (vec).current
/*!
* \brief Get an element from a vector.
*
* \param vec Vector to query.
* \param idx Index of the element to get.
*/
#define ast_vector_get(vec, idx) ({ \
size_t __idx = (idx); \
ast_assert(__idx < (vec).current); \
(vec).elems[__idx]; \
})
#endif /* _ASTERISK_VECTOR_H */

View File

@ -478,38 +478,23 @@ static int internal_ao2_ref(void *user_data, int delta, const char *file, int li
ast_atomic_fetchadd_int(&ao2.total_objects, -1);
#endif
/* In case someone uses an object after it's been freed */
obj->priv_data.magic = 0;
switch (obj->priv_data.options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
obj_mutex = INTERNAL_OBJ_MUTEX(user_data);
ast_mutex_destroy(&obj_mutex->mutex.lock);
/*
* For safety, zero-out the astobj2_lock header and also the
* first word of the user-data, which we make sure is always
* allocated.
*/
memset(obj_mutex, '\0', sizeof(*obj_mutex) + sizeof(void *) );
ast_free(obj_mutex);
break;
case AO2_ALLOC_OPT_LOCK_RWLOCK:
obj_rwlock = INTERNAL_OBJ_RWLOCK(user_data);
ast_rwlock_destroy(&obj_rwlock->rwlock.lock);
/*
* For safety, zero-out the astobj2_rwlock header and also the
* first word of the user-data, which we make sure is always
* allocated.
*/
memset(obj_rwlock, '\0', sizeof(*obj_rwlock) + sizeof(void *) );
ast_free(obj_rwlock);
break;
case AO2_ALLOC_OPT_LOCK_NOLOCK:
/*
* For safety, zero-out the astobj2 header and also the first
* word of the user-data, which we make sure is always
* allocated.
*/
memset(obj, '\0', sizeof(*obj) + sizeof(void *) );
ast_free(obj);
break;
default:
@ -575,14 +560,6 @@ static void *internal_ao2_alloc(size_t data_size, ao2_destructor_fn destructor_f
struct astobj2_lock *obj_mutex;
struct astobj2_rwlock *obj_rwlock;
if (data_size < sizeof(void *)) {
/*
* We always alloc at least the size of a void *,
* for debugging purposes.
*/
data_size = sizeof(void *);
}
switch (options & AO2_ALLOC_OPT_LOCK_MASK) {
case AO2_ALLOC_OPT_LOCK_MUTEX:
#if defined(__AST_DEBUG_MALLOC)

View File

@ -1397,7 +1397,7 @@ static void generic_monitor_instance_list_destructor(void *obj)
ast_free((char *)generic_list->device_name);
}
static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg);
static struct generic_monitor_instance_list *create_new_generic_list(struct ast_cc_monitor *monitor)
{
struct generic_monitor_instance_list *generic_list = ao2_t_alloc(sizeof(*generic_list),
@ -1471,7 +1471,7 @@ static int generic_monitor_devstate_tp_cb(void *data)
return 0;
}
static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void generic_monitor_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
/* Wow, it's cool that we've picked up on a state change, but we really want
* the actual work to be done in the core's taskprocessor execution thread
@ -2750,7 +2750,7 @@ static int cc_generic_agent_stop_ringing(struct ast_cc_agent *agent)
return 0;
}
static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void generic_agent_devstate_cb(void *userdata, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_cc_agent *agent = userdata;
enum ast_device_state new_state;

View File

@ -334,13 +334,13 @@ static struct ao2_container *active_cdrs_by_channel;
static struct stasis_message_router *stasis_router;
/*! \brief Our subscription for bridges */
static struct stasis_subscription *bridge_subscription;
static struct stasis_forward *bridge_subscription;
/*! \brief Our subscription for channels */
static struct stasis_subscription *channel_subscription;
static struct stasis_forward *channel_subscription;
/*! \brief Our subscription for parking */
static struct stasis_subscription *parking_subscription;
static struct stasis_forward *parking_subscription;
/*! \brief The parent topic for all topics we want to aggregate for CDRs */
static struct stasis_topic *cdr_topic;
@ -1839,7 +1839,7 @@ static int finalized_state_process_party_a(struct cdr_object *cdr, struct ast_ch
* \param topic The topic this message was published for
* \param message The message
*/
static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void handle_dial_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
@ -2020,7 +2020,7 @@ static int check_new_cdr_needed(struct ast_channel_snapshot *old_snapshot,
* \param topic The topic this message was published for
* \param message The message
*/
static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void handle_channel_cache_message(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
RAII_VAR(struct cdr_object *, cdr, NULL, ao2_cleanup);
RAII_VAR(struct module_config *, mod_cfg, ao2_global_obj_ref(module_configs), ao2_cleanup);
@ -2150,7 +2150,7 @@ static int filter_bridge_messages(struct ast_bridge_snapshot *bridge)
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_leave_message(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
@ -2450,7 +2450,7 @@ static void handle_standard_bridge_enter_message(struct cdr_object *cdr,
* \param message The message - hopefully a bridge one!
*/
static void handle_bridge_enter_message(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_bridge_blob *update = stasis_message_data(message);
struct ast_bridge_snapshot *bridge = update->bridge;
@ -2494,7 +2494,7 @@ static void handle_bridge_enter_message(void *data, struct stasis_subscription *
* \param message The message about who got parked
* */
static void handle_parked_call_message(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_parked_call_payload *payload = stasis_message_data(message);
struct ast_channel_snapshot *channel = payload->parkee;
@ -3884,9 +3884,9 @@ static int process_config(int reload)
static void cdr_engine_cleanup(void)
{
channel_subscription = stasis_unsubscribe_and_join(channel_subscription);
bridge_subscription = stasis_unsubscribe_and_join(bridge_subscription);
parking_subscription = stasis_unsubscribe_and_join(parking_subscription);
channel_subscription = stasis_forward_cancel(channel_subscription);
bridge_subscription = stasis_forward_cancel(bridge_subscription);
parking_subscription = stasis_forward_cancel(parking_subscription);
stasis_message_router_unsubscribe_and_join(stasis_router);
ao2_cleanup(cdr_topic);
cdr_topic = NULL;

View File

@ -121,16 +121,16 @@ static struct stasis_topic *cel_topic;
static struct stasis_topic *cel_aggregation_topic;
/*! Subscription for forwarding the channel caching topic */
static struct stasis_subscription *cel_channel_forwarder;
static struct stasis_forward *cel_channel_forwarder;
/*! Subscription for forwarding the channel caching topic */
static struct stasis_subscription *cel_bridge_forwarder;
static struct stasis_forward *cel_bridge_forwarder;
/*! Subscription for forwarding the parking topic */
static struct stasis_subscription *cel_parking_forwarder;
static struct stasis_forward *cel_parking_forwarder;
/*! Subscription for forwarding the CEL-specific topic */
static struct stasis_subscription *cel_cel_forwarder;
static struct stasis_forward *cel_cel_forwarder;
struct stasis_message_type *cel_generic_type(void);
STASIS_MESSAGE_TYPE_DEFN(cel_generic_type);
@ -1019,7 +1019,6 @@ static int cel_filter_channel_snapshot(struct ast_channel_snapshot *snapshot)
}
static void cel_snapshot_update_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_cache_update *update = stasis_message_data(message);
@ -1082,7 +1081,6 @@ static struct ast_str *cel_generate_peer_str(
static void cel_bridge_enter_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@ -1110,7 +1108,6 @@ static void cel_bridge_enter_cb(
static void cel_bridge_leave_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@ -1138,7 +1135,6 @@ static void cel_bridge_leave_cb(
static void cel_parking_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_parked_call_payload *parked_payload = stasis_message_data(message);
@ -1183,7 +1179,6 @@ static void save_dialstatus(struct ast_multi_channel_blob *blob)
}
static void cel_dial_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *blob = stasis_message_data(message);
@ -1218,7 +1213,6 @@ static void cel_dial_cb(void *data, struct stasis_subscription *sub,
static void cel_generic_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
@ -1241,7 +1235,6 @@ static void cel_generic_cb(
static void cel_blind_transfer_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *obj = stasis_message_data(message);
@ -1289,7 +1282,6 @@ static void cel_blind_transfer_cb(
static void cel_attended_transfer_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_attended_transfer_message *xfer = stasis_message_data(message);
@ -1342,7 +1334,6 @@ static void cel_attended_transfer_cb(
static void cel_pickup_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
@ -1364,7 +1355,6 @@ static void cel_pickup_cb(
static void cel_local_cb(
void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
@ -1394,10 +1384,10 @@ static void ast_cel_engine_term(void)
cel_aggregation_topic = NULL;
ao2_cleanup(cel_topic);
cel_topic = NULL;
cel_channel_forwarder = stasis_unsubscribe_and_join(cel_channel_forwarder);
cel_bridge_forwarder = stasis_unsubscribe_and_join(cel_bridge_forwarder);
cel_parking_forwarder = stasis_unsubscribe_and_join(cel_parking_forwarder);
cel_cel_forwarder = stasis_unsubscribe_and_join(cel_cel_forwarder);
cel_channel_forwarder = stasis_forward_cancel(cel_channel_forwarder);
cel_bridge_forwarder = stasis_forward_cancel(cel_bridge_forwarder);
cel_parking_forwarder = stasis_forward_cancel(cel_parking_forwarder);
cel_cel_forwarder = stasis_forward_cancel(cel_cel_forwarder);
ast_cli_unregister(&cli_status);
ao2_cleanup(cel_dialstatus_store);
cel_dialstatus_store = NULL;

View File

@ -7549,14 +7549,19 @@ struct varshead *ast_channel_get_manager_vars(struct ast_channel *chan)
RAII_VAR(struct ast_str *, tmp, NULL, ast_free);
struct manager_channel_variable *mcv;
ret = ao2_alloc(sizeof(*ret), varshead_dtor);
tmp = ast_str_create(16);
if (!ret || !tmp) {
return NULL;
}
AST_RWLIST_RDLOCK(&channelvars);
if (AST_LIST_EMPTY(&channelvars)) {
return NULL;
}
ret = ao2_alloc(sizeof(*ret), varshead_dtor);
tmp = ast_str_create(16);
AST_LIST_TRAVERSE(&channelvars, mcv, entry) {
const char *val = NULL;
struct ast_var_t *var;

View File

@ -207,8 +207,7 @@ struct ast_channel {
char sending_dtmf_digit; /*!< Digit this channel is currently sending out. (zero if not sending) */
struct timeval sending_dtmf_tv; /*!< The time this channel started sending the current digit. (Invalid if sending_dtmf_digit is zero.) */
struct stasis_cp_single *topics; /*!< Topic for all channel's events */
struct stasis_subscription *forwarder; /*!< Subscription for event forwarding to all topic */
struct stasis_subscription *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
struct stasis_forward *endpoint_forward; /*!< Subscription for event forwarding to endpoint's topic */
};
/*! \brief The monotonically increasing integer counter for channel uniqueids */
@ -1429,8 +1428,7 @@ void ast_channel_internal_cleanup(struct ast_channel *chan)
ast_string_field_free_memory(chan);
chan->forwarder = stasis_unsubscribe(chan->forwarder);
chan->endpoint_forward = stasis_unsubscribe(chan->endpoint_forward);
chan->endpoint_forward = stasis_forward_cancel(chan->endpoint_forward);
stasis_cp_single_unsubscribe(chan->topics);
chan->topics = NULL;

View File

@ -610,7 +610,7 @@ static int aggregate_state_changed(char *device, enum ast_device_state new_aggre
return 1;
}
static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void devstate_change_collector_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
enum ast_device_state aggregate_state;
char *device;

View File

@ -152,7 +152,7 @@ int ast_endpoint_add_channel(struct ast_endpoint *endpoint,
/*! \brief Handler for channel snapshot cache clears */
static void endpoint_cache_clear(void *data,
struct stasis_subscription *sub, struct stasis_topic *topic,
struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_endpoint *endpoint = data;
@ -174,7 +174,7 @@ static void endpoint_cache_clear(void *data,
}
static void endpoint_default(void *data,
struct stasis_subscription *sub, struct stasis_topic *topic,
struct stasis_subscription *sub,
struct stasis_message *message)
{
struct stasis_endpoint *endpoint = data;

View File

@ -1126,7 +1126,7 @@ static struct stasis_topic *manager_topic;
static struct stasis_message_router *stasis_router;
/*! \brief The \ref stasis_subscription for forwarding the RTP topic to the AMI topic */
static struct stasis_subscription *rtp_topic_forwarder;
static struct stasis_forward *rtp_topic_forwarder;
#define MGR_SHOW_TERMINAL_WIDTH 80
@ -1151,7 +1151,7 @@ static const struct {
{{ "restart", "gracefully", NULL }},
};
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message);
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message);
static void acl_change_stasis_subscribe(void)
{
@ -1427,7 +1427,6 @@ struct ast_str *ast_manager_str_from_json_object(struct ast_json *blob, key_excl
}
static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_manager_event_blob *, ev, NULL, ao2_cleanup);
@ -1444,7 +1443,6 @@ static void manager_default_msg_cb(void *data, struct stasis_subscription *sub,
}
static void manager_generic_msg_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_json_payload *payload = stasis_message_data(message);
@ -7640,7 +7638,6 @@ static void load_channelvars(struct ast_variable *var)
#ifdef TEST_FRAMEWORK
static void test_suite_event_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_test_suite_message_payload *payload;
@ -7759,7 +7756,7 @@ static void manager_shutdown(void)
stasis_message_router_unsubscribe_and_join(stasis_router);
stasis_router = NULL;
}
stasis_unsubscribe_and_join(rtp_topic_forwarder);
stasis_forward_cancel(rtp_topic_forwarder);
rtp_topic_forwarder = NULL;
ao2_cleanup(manager_topic);
manager_topic = NULL;
@ -8344,7 +8341,7 @@ static int __init_manager(int reload, int by_external_config)
}
static void acl_change_stasis_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
if (stasis_message_type(message) != ast_named_acl_change_type()) {
return;

View File

@ -106,7 +106,7 @@ static struct stasis_message_router *bridge_state_router;
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
static struct stasis_subscription *topic_forwarder;
static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_bridge_state_string_prefix(
const struct ast_bridge_snapshot *snapshot,
@ -180,7 +180,6 @@ bridge_snapshot_monitor bridge_monitors[] = {
};
static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, bridge_event_string, NULL, ast_free);
@ -221,7 +220,6 @@ static void bridge_snapshot_update(void *data, struct stasis_subscription *sub,
}
static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_merge_message *merge_msg = stasis_message_data(message);
@ -254,7 +252,6 @@ static void bridge_merge_cb(void *data, struct stasis_subscription *sub,
}
static void channel_enter_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
static const char *swap_name = "SwapUniqueid: ";
@ -283,7 +280,6 @@ static void channel_enter_cb(void *data, struct stasis_subscription *sub,
}
static void channel_leave_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_bridge_blob *blob = stasis_message_data(message);
@ -456,7 +452,7 @@ static int manager_bridge_info(struct mansession *s, const struct message *m)
static void manager_bridging_cleanup(void)
{
stasis_unsubscribe(topic_forwarder);
stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}

View File

@ -370,7 +370,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! \brief The \ref stasis subscription returned by the forwarding of the channel topic
* to the manager topic
*/
static struct stasis_subscription *topic_forwarder;
static struct stasis_forward *topic_forwarder;
struct ast_str *ast_manager_build_channel_state_string_prefix(
const struct ast_channel_snapshot *snapshot,
@ -565,7 +565,6 @@ channel_snapshot_monitor channel_monitors[] = {
};
static void channel_snapshot_update(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@ -616,7 +615,7 @@ static int userevent_exclusion_cb(const char *key)
}
static void channel_user_event_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@ -667,7 +666,7 @@ static void publish_basic_channel_event(const char *event, int class, struct ast
}
static void channel_hangup_request_cb(void *data,
struct stasis_subscription *sub, struct stasis_topic *topic,
struct stasis_subscription *sub,
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
@ -707,7 +706,7 @@ static void channel_hangup_request_cb(void *data,
}
static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
struct ast_channel_snapshot *spyer;
@ -730,7 +729,7 @@ static void channel_chanspy_stop_cb(void *data, struct stasis_subscription *sub,
}
static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, spyer_channel_string, NULL, ast_free);
RAII_VAR(struct ast_str *, spyee_channel_string, NULL, ast_free);
@ -765,7 +764,7 @@ static void channel_chanspy_start_cb(void *data, struct stasis_subscription *sub
}
static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@ -806,7 +805,7 @@ static void channel_dtmf_begin_cb(void *data, struct stasis_subscription *sub,
}
static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@ -853,7 +852,7 @@ static void channel_dtmf_end_cb(void *data, struct stasis_subscription *sub,
}
static void channel_hangup_handler_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
struct ast_channel_blob *payload = stasis_message_data(message);
@ -884,7 +883,7 @@ static void channel_hangup_handler_cb(void *data, struct stasis_subscription *su
}
static void channel_fax_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
RAII_VAR(struct ast_str *, event_buffer, ast_str_create(256), ast_free);
@ -957,7 +956,7 @@ static void channel_fax_cb(void *data, struct stasis_subscription *sub,
}
static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
struct ast_json *blob = payload->blob;
@ -977,7 +976,7 @@ static void channel_moh_start_cb(void *data, struct stasis_subscription *sub,
}
static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@ -985,7 +984,7 @@ static void channel_moh_stop_cb(void *data, struct stasis_subscription *sub,
}
static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@ -993,7 +992,7 @@ static void channel_monitor_start_cb(void *data, struct stasis_subscription *sub
}
static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *payload = stasis_message_data(message);
@ -1004,7 +1003,7 @@ static void channel_monitor_stop_cb(void *data, struct stasis_subscription *sub,
* \brief Callback processing messages for channel dialing
*/
static void channel_dial_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_multi_channel_blob *obj = stasis_message_data(message);
const char *dialstatus;
@ -1051,7 +1050,7 @@ static void channel_dial_cb(void *data, struct stasis_subscription *sub,
}
static void channel_hold_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
const char *musicclass;
@ -1083,7 +1082,7 @@ static void channel_hold_cb(void *data, struct stasis_subscription *sub,
}
static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_channel_blob *obj = stasis_message_data(message);
RAII_VAR(struct ast_str *, channel_event_string, NULL, ast_free);
@ -1100,7 +1099,7 @@ static void channel_unhold_cb(void *data, struct stasis_subscription *sub,
static void manager_channels_shutdown(void)
{
stasis_unsubscribe(topic_forwarder);
stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}

View File

@ -46,14 +46,9 @@ static void manager_endpoints_shutdown(void)
}
static void endpoint_state_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
/* XXX This looks wrong. Nothing should post or forward to a caching
* topic directly. Maybe ast_endpoint_topic() would be correct? I'd have
* to dig to make sure I don't break anything, though.
*/
stasis_forward_message(ast_manager_get_topic(), ast_endpoint_topic_all_cached(), message);
stasis_publish(ast_manager_get_topic(), message);
}
int manager_endpoints_init(void)

View File

@ -41,7 +41,7 @@ struct stasis_message_router *mwi_state_router;
/*! \brief The \ref stasis subscription returned by the forwarding of the MWI topic
* to the manager topic
*/
static struct stasis_subscription *topic_forwarder;
static struct stasis_forward *topic_forwarder;
/*! \brief Callback function used by \ref mwi_app_event_cb to weed out "Event" keys */
static int exclude_event_cb(const char *key)
@ -54,7 +54,6 @@ static int exclude_event_cb(const char *key)
/*! \brief Generic MWI event callback used for one-off events from voicemail modules */
static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_blob *payload = stasis_message_data(message);
@ -86,7 +85,6 @@ static void mwi_app_event_cb(void *data, struct stasis_subscription *sub,
}
static void mwi_update_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct ast_mwi_state *mwi_state;
@ -149,7 +147,7 @@ static void mwi_update_cb(void *data, struct stasis_subscription *sub,
static void manager_mwi_shutdown(void)
{
stasis_unsubscribe(topic_forwarder);
stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}

View File

@ -34,11 +34,11 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! \brief The \ref stasis subscription returned by the forwarding of the system topic
* to the manager topic
*/
static struct stasis_subscription *topic_forwarder;
static struct stasis_forward *topic_forwarder;
static void manager_system_shutdown(void)
{
stasis_unsubscribe(topic_forwarder);
stasis_forward_cancel(topic_forwarder);
topic_forwarder = NULL;
}

View File

@ -5111,7 +5111,7 @@ static void get_device_state_causing_channels(struct ao2_container *c)
ao2_iterator_destroy(&iter);
}
static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void device_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_device_state_message *dev_state;
struct ast_hint *hint;
@ -11369,7 +11369,7 @@ static int pbx_builtin_sayphonetic(struct ast_channel *chan, const char *data)
return res;
}
static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void presence_state_cb(void *unused, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_presence_state_message *presence_state = stasis_message_data(msg);
struct ast_hint *hint;

116
main/sem.c Normal file
View File

@ -0,0 +1,116 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief Asterisk semaphore support.
*/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/sem.h"
#include "asterisk/utils.h"
#ifndef HAS_WORKING_SEMAPHORE
/* DIY semaphores! */
int ast_sem_init(struct ast_sem *sem, int pshared, unsigned int value)
{
if (pshared) {
/* Don't need it... yet */
errno = ENOSYS;
return -1;
}
/* Since value is unsigned, this will also catch attempts to init with
* a negative value */
if (value > AST_SEM_VALUE_MAX) {
errno = EINVAL;
return -1;
}
sem->count = value;
sem->waiters = 0;
ast_mutex_init(&sem->mutex);
ast_cond_init(&sem->cond, NULL);
return 0;
}
int ast_sem_destroy(struct ast_sem *sem)
{
ast_mutex_destroy(&sem->mutex);
ast_cond_destroy(&sem->cond);
return 0;
}
int ast_sem_post(struct ast_sem *sem)
{
SCOPED_MUTEX(lock, &sem->mutex);
ast_assert(sem->count >= 0);
if (sem->count == AST_SEM_VALUE_MAX) {
errno = EOVERFLOW;
return -1;
}
/* Give it up! */
++sem->count;
/* Release a waiter, if needed */
if (sem->waiters) {
ast_cond_signal(&sem->cond);
}
return 0;
}
int ast_sem_wait(struct ast_sem *sem)
{
SCOPED_MUTEX(lock, &sem->mutex);
ast_assert(sem->count >= 0);
/* Wait for a non-zero count */
++sem->waiters;
while (sem->count == 0) {
ast_cond_wait(&sem->cond, &sem->mutex);
}
--sem->waiters;
/* Take it! */
--sem->count;
return 0;
}
int ast_sem_getvalue(struct ast_sem *sem, int *sval)
{
SCOPED_MUTEX(lock, &sem->mutex);
ast_assert(sem->count >= 0);
*sval = sem->count;
return 0;
}
#endif

View File

@ -281,7 +281,7 @@ static void sounds_cleanup(void)
}
static void format_update_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
ast_sounds_reindex();
}

View File

@ -29,15 +29,15 @@
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
ASTERISK_FILE_VERSION(__FILE__, "$Revision$");
#include "asterisk/astobj2.h"
#include "asterisk/stasis_internal.h"
#include "asterisk/stasis.h"
#include "asterisk/threadpool.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/utils.h"
#include "asterisk/uuid.h"
#include "asterisk/vector.h"
/*!
* \page stasis-impl Stasis Implementation Notes
@ -134,24 +134,23 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
/*! The number of buckets to use for topic pools */
#define TOPIC_POOL_BUCKETS 57
/*! Threadpool for dispatching notifications to subscribers */
static struct ast_threadpool *pool;
STASIS_MESSAGE_TYPE_DEFN(stasis_subscription_change_type);
/*! \internal */
struct stasis_topic {
char *name;
/*! Variable length array of the subscribers */
struct stasis_subscription **subscribers;
/*! Allocated length of the subscribers array */
size_t num_subscribers_max;
/*! Current size of the subscribers array */
size_t num_subscribers_current;
ast_vector(struct stasis_subscription *) subscribers;
/*! Topics forwarding into this topic */
ast_vector(struct stasis_topic *) upstream_topics;
};
/* Forward declarations for the tightly-coupled subscription object */
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static int topic_add_subscription(struct stasis_topic *topic,
struct stasis_subscription *sub);
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub);
static void topic_dtor(void *obj)
{
@ -159,16 +158,18 @@ static void topic_dtor(void *obj)
/* Subscribers hold a reference to topics, so they should all be
* unsubscribed before we get here. */
ast_assert(topic->num_subscribers_current == 0);
ast_assert(ast_vector_size(topic->subscribers) == 0);
ast_free(topic->name);
topic->name = NULL;
ast_free(topic->subscribers);
topic->subscribers = NULL;
ast_vector_free(topic->subscribers);
ast_vector_free(topic->upstream_topics);
}
struct stasis_topic *stasis_topic_create(const char *name)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
int res = 0;
topic = ao2_alloc(sizeof(*topic), topic_dtor);
@ -181,9 +182,10 @@ struct stasis_topic *stasis_topic_create(const char *name)
return NULL;
}
topic->num_subscribers_max = INITIAL_SUBSCRIBERS_MAX;
topic->subscribers = ast_calloc(topic->num_subscribers_max, sizeof(*topic->subscribers));
if (!topic->subscribers) {
res |= ast_vector_init(topic->subscribers, INITIAL_SUBSCRIBERS_MAX);
res |= ast_vector_init(topic->upstream_topics, 0);
if (res != 0) {
return NULL;
}
@ -247,7 +249,6 @@ static void subscription_dtor(void *obj)
* \param message Message to send.
*/
static void subscription_invoke(struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
/* Notify that the final message has been received */
@ -258,7 +259,7 @@ static void subscription_invoke(struct stasis_subscription *sub,
}
/* Since sub is mostly immutable, no need to lock sub */
sub->callback(sub->data, sub, topic, message);
sub->callback(sub->data, sub, message);
/* Notify that the final message has been processed */
if (stasis_subscription_final_message(sub, message)) {
@ -268,7 +269,8 @@ static void subscription_invoke(struct stasis_subscription *sub,
}
}
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description);
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
static void send_subscription_unsubscribe(struct stasis_topic *topic, struct stasis_subscription *sub);
struct stasis_subscription *internal_stasis_subscribe(
struct stasis_topic *topic,
@ -286,10 +288,21 @@ struct stasis_subscription *internal_stasis_subscribe(
ast_uuid_generate_str(sub->uniqueid, sizeof(sub->uniqueid));
if (needs_mailbox) {
sub->mailbox = ast_threadpool_serializer(sub->uniqueid, pool);
/* With a small number of subscribers, a thread-per-sub is
* acceptable. If our usage changes so that we have larger
* numbers of subscribers, we'll probably want to consider
* a threadpool. We had that originally, but with so few
* subscribers it was actually a performance loss instead of
* a gain.
*/
sub->mailbox = ast_taskprocessor_get(sub->uniqueid,
TPS_REF_DEFAULT);
if (!sub->mailbox) {
return NULL;
}
ast_taskprocessor_set_local(sub->mailbox, sub);
/* Taskprocessor has a reference */
ao2_ref(sub, +1);
}
ao2_ref(topic, +1);
@ -302,7 +315,7 @@ struct stasis_subscription *internal_stasis_subscribe(
if (topic_add_subscription(topic, sub) != 0) {
return NULL;
}
send_subscription_change_message(topic, sub->uniqueid, "Subscribe");
send_subscription_subscribe(topic, sub);
ao2_ref(sub, +1);
return sub;
@ -316,29 +329,42 @@ struct stasis_subscription *stasis_subscribe(
return internal_stasis_subscribe(topic, callback, data, 1);
}
static int sub_cleanup(void *data)
{
struct stasis_subscription *sub = data;
ao2_cleanup(sub);
return 0;
}
struct stasis_subscription *stasis_unsubscribe(struct stasis_subscription *sub)
{
if (sub) {
size_t i;
/* The subscription may be the last ref to this topic. Hold
* the topic ref open until after the unlock. */
RAII_VAR(struct stasis_topic *, topic, ao2_bump(sub->topic),
ao2_cleanup);
SCOPED_AO2LOCK(lock_topic, topic);
/* The subscription may be the last ref to this topic. Hold
* the topic ref open until after the unlock. */
RAII_VAR(struct stasis_topic *, topic,
ao2_bump(sub ? sub->topic : NULL), ao2_cleanup);
for (i = 0; i < topic->num_subscribers_current; ++i) {
if (topic->subscribers[i] == sub) {
send_subscription_change_message(topic, sub->uniqueid, "Unsubscribe");
/* swap [i] with last entry; remove last entry */
topic->subscribers[i] = topic->subscribers[--topic->num_subscribers_current];
/* Unsubscribing unrefs the subscription */
ao2_cleanup(sub);
return NULL;
}
}
ast_log(LOG_ERROR, "Internal error: subscription has invalid topic\n");
if (!sub) {
return NULL;
}
/* We have to remove the subscription first, to ensure the unsubscribe
* is the final message */
if (topic_remove_subscription(sub->topic, sub) != 0) {
ast_log(LOG_ERROR,
"Internal error: subscription has invalid topic\n");
return NULL;
}
/* Now let everyone know about the unsubscribe */
send_subscription_unsubscribe(topic, sub);
/* When all that's done, remove the ref the mailbox has on the sub */
if (sub->mailbox) {
ast_taskprocessor_push(sub->mailbox, sub_cleanup, sub);
}
/* Unsubscribing unrefs the subscription */
ao2_cleanup(sub);
return NULL;
}
@ -388,8 +414,8 @@ int stasis_subscription_is_subscribed(const struct stasis_subscription *sub)
struct stasis_topic *topic = sub->topic;
SCOPED_AO2LOCK(lock_topic, topic);
for (i = 0; i < topic->num_subscribers_current; ++i) {
if (topic->subscribers[i] == sub) {
for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
if (ast_vector_get(topic->subscribers, i) == sub) {
return 1;
}
}
@ -431,74 +457,36 @@ int stasis_subscription_final_message(struct stasis_subscription *sub, struct st
*/
static int topic_add_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
struct stasis_subscription **subscribers;
size_t idx;
SCOPED_AO2LOCK(lock, topic);
/* Increase list size, if needed */
if (topic->num_subscribers_current + 1 > topic->num_subscribers_max) {
subscribers = realloc(topic->subscribers, 2 * topic->num_subscribers_max * sizeof(*subscribers));
if (!subscribers) {
return -1;
}
topic->subscribers = subscribers;
topic->num_subscribers_max *= 2;
}
/* The reference from the topic to the subscription is shared with
* the owner of the subscription, which will explicitly unsubscribe
* to release it.
*
* If we bumped the refcount here, the owner would have to unsubscribe
* and cleanup, which is a bit awkward. */
topic->subscribers[topic->num_subscribers_current++] = sub;
ast_vector_append(topic->subscribers, sub);
for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
topic_add_subscription(
ast_vector_get(topic->upstream_topics, idx), sub);
}
return 0;
}
/*!
* \internal
* \brief Information needed to dispatch a message to a subscription
*/
struct dispatch {
/*! Topic message was published to */
struct stasis_topic *topic;
/*! The message itself */
struct stasis_message *message;
/*! Subscription receiving the message */
struct stasis_subscription *sub;
};
static void dispatch_dtor(void *data)
static int topic_remove_subscription(struct stasis_topic *topic, struct stasis_subscription *sub)
{
struct dispatch *dispatch = data;
ao2_cleanup(dispatch->topic);
ao2_cleanup(dispatch->message);
ao2_cleanup(dispatch->sub);
}
size_t idx;
SCOPED_AO2LOCK(lock_topic, topic);
static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasis_message *message, struct stasis_subscription *sub)
{
RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
ast_assert(topic != NULL);
ast_assert(message != NULL);
ast_assert(sub != NULL);
dispatch = ao2_alloc(sizeof(*dispatch), dispatch_dtor);
if (!dispatch) {
return NULL;
for (idx = 0; idx < ast_vector_size(topic->upstream_topics); ++idx) {
topic_remove_subscription(
ast_vector_get(topic->upstream_topics, idx), sub);
}
dispatch->topic = topic;
ao2_ref(topic, +1);
dispatch->message = message;
ao2_ref(message, +1);
dispatch->sub = sub;
ao2_ref(sub, +1);
ao2_ref(dispatch, +1);
return dispatch;
return ast_vector_remove_elem_unordered(topic->subscribers, sub);
}
/*!
@ -506,16 +494,34 @@ static struct dispatch *dispatch_create(struct stasis_topic *topic, struct stasi
* \param data \ref dispatch object
* \return 0
*/
static int dispatch_exec(void *data)
static int dispatch_exec(struct ast_taskprocessor_local *local)
{
RAII_VAR(struct dispatch *, dispatch, data, ao2_cleanup);
struct stasis_subscription *sub = local->local_data;
struct stasis_message *message = local->data;
subscription_invoke(dispatch->sub, dispatch->topic, dispatch->message);
subscription_invoke(sub, message);
ao2_cleanup(message);
return 0;
}
void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *publisher_topic, struct stasis_message *message)
static void dispatch_message(struct stasis_subscription *sub,
struct stasis_message *message)
{
if (sub->mailbox) {
ao2_bump(message);
if (ast_taskprocessor_push_local(sub->mailbox, dispatch_exec, message) != 0) {
/* Push failed; ugh. */
ast_log(LOG_DEBUG, "Dropping dispatch\n");
ao2_cleanup(message);
}
} else {
/* Dispatch directly */
subscription_invoke(sub, message);
}
}
void stasis_publish(struct stasis_topic *_topic, struct stasis_message *message)
{
size_t i;
/* The topic may be unref'ed by the subscription invocation.
@ -525,70 +531,104 @@ void stasis_forward_message(struct stasis_topic *_topic, struct stasis_topic *pu
SCOPED_AO2LOCK(lock, topic);
ast_assert(topic != NULL);
ast_assert(publisher_topic != NULL);
ast_assert(message != NULL);
for (i = 0; i < topic->num_subscribers_current; ++i) {
struct stasis_subscription *sub = topic->subscribers[i];
for (i = 0; i < ast_vector_size(topic->subscribers); ++i) {
struct stasis_subscription *sub =
ast_vector_get(topic->subscribers, i);
ast_assert(sub != NULL);
if (sub->mailbox) {
RAII_VAR(struct dispatch *, dispatch, NULL, ao2_cleanup);
dispatch_message(sub, message);
}
}
dispatch = dispatch_create(publisher_topic, message, sub);
if (!dispatch) {
ast_log(LOG_DEBUG, "Dropping dispatch\n");
break;
}
/*!
* \brief Forwarding information
*
* Any message posted to \a from_topic is forwarded to \a to_topic.
*
* In cases where both the \a from_topic and \a to_topic need to be locked,
* always lock the \a to_topic first, then the \a from_topic. Lest you deadlock.
*/
struct stasis_forward {
/*! Originating topic */
struct stasis_topic *from_topic;
/*! Destination topic */
struct stasis_topic *to_topic;
};
if (ast_taskprocessor_push(sub->mailbox, dispatch_exec, dispatch) == 0) {
/* Ownership transferred to mailbox.
* Don't increment ref, b/c the task processor
* may have already gotten rid of the object.
*/
dispatch = NULL;
}
} else {
/* Dispatch directly */
subscription_invoke(sub, publisher_topic, message);
static void forward_dtor(void *obj)
{
struct stasis_forward *forward = obj;
ao2_cleanup(forward->from_topic);
forward->from_topic = NULL;
ao2_cleanup(forward->to_topic);
forward->to_topic = NULL;
}
struct stasis_forward *stasis_forward_cancel(struct stasis_forward *forward)
{
if (forward) {
int idx;
struct stasis_topic *from = forward->from_topic;
struct stasis_topic *to = forward->to_topic;
SCOPED_AO2LOCK(to_lock, to);
ast_vector_remove_elem_unordered(to->upstream_topics, from);
ao2_lock(from);
for (idx = 0; idx < ast_vector_size(to->subscribers); ++idx) {
topic_remove_subscription(
from, ast_vector_get(to->subscribers, idx));
}
ao2_unlock(from);
}
ao2_cleanup(forward);
return NULL;
}
void stasis_publish(struct stasis_topic *topic, struct stasis_message *message)
struct stasis_forward *stasis_forward_all(struct stasis_topic *from_topic,
struct stasis_topic *to_topic)
{
stasis_forward_message(topic, topic, message);
}
RAII_VAR(struct stasis_forward *, forward, NULL, ao2_cleanup);
/*! \brief Forwarding subscriber */
static void stasis_forward_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
{
struct stasis_topic *to_topic = data;
stasis_forward_message(to_topic, topic, message);
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(to_topic);
}
}
struct stasis_subscription *stasis_forward_all(struct stasis_topic *from_topic, struct stasis_topic *to_topic)
{
struct stasis_subscription *sub;
if (!from_topic || !to_topic) {
return NULL;
}
/* Forwarding subscriptions should dispatch directly instead of having a
* mailbox. Otherwise, messages forwarded to the same topic from
* different topics may get reordered. Which is bad.
*/
sub = internal_stasis_subscribe(from_topic, stasis_forward_cb, to_topic, 0);
if (sub) {
/* hold a ref to to_topic for this forwarding subscription */
ao2_ref(to_topic, +1);
forward = ao2_alloc(sizeof(*forward), forward_dtor);
if (!forward) {
return NULL;
}
return sub;
forward->from_topic = ao2_bump(from_topic);
forward->to_topic = ao2_bump(to_topic);
{
SCOPED_AO2LOCK(lock, to_topic);
int res;
res = ast_vector_append(to_topic->upstream_topics, from_topic);
if (res != 0) {
return NULL;
}
{
SCOPED_AO2LOCK(lock, from_topic);
size_t idx;
for (idx = 0; idx < ast_vector_size(to_topic->subscribers); ++idx) {
topic_add_subscription(from_topic, ast_vector_get(to_topic->subscribers, idx));
}
}
}
return ao2_bump(forward);
}
static void subscription_change_dtor(void *obj)
@ -598,7 +638,7 @@ static void subscription_change_dtor(void *obj)
ao2_cleanup(change->topic);
}
static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, char *uniqueid, char *description)
static struct stasis_subscription_change *subscription_change_alloc(struct stasis_topic *topic, const char *uniqueid, const char *description)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
@ -616,12 +656,15 @@ static struct stasis_subscription_change *subscription_change_alloc(struct stasi
return change;
}
static void send_subscription_change_message(struct stasis_topic *topic, char *uniqueid, char *description)
static void send_subscription_subscribe(struct stasis_topic *topic, struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
change = subscription_change_alloc(topic, uniqueid, description);
/* This assumes that we have already unsubscribed */
ast_assert(stasis_subscription_is_subscribed(sub));
change = subscription_change_alloc(topic, sub->uniqueid, "Subscribe");
if (!change) {
return;
@ -636,15 +679,42 @@ static void send_subscription_change_message(struct stasis_topic *topic, char *u
stasis_publish(topic, msg);
}
static void send_subscription_unsubscribe(struct stasis_topic *topic,
struct stasis_subscription *sub)
{
RAII_VAR(struct stasis_subscription_change *, change, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
/* This assumes that we have already unsubscribed */
ast_assert(!stasis_subscription_is_subscribed(sub));
change = subscription_change_alloc(topic, sub->uniqueid, "Unsubscribe");
if (!change) {
return;
}
msg = stasis_message_create(stasis_subscription_change_type(), change);
if (!msg) {
return;
}
stasis_publish(topic, msg);
/* Now we have to dispatch to the subscription itself */
dispatch_message(sub, msg);
}
struct topic_pool_entry {
struct stasis_subscription *forward;
struct stasis_forward *forward;
struct stasis_topic *topic;
};
static void topic_pool_entry_dtor(void *obj)
{
struct topic_pool_entry *entry = obj;
entry->forward = stasis_unsubscribe(entry->forward);
entry->forward = stasis_forward_cancel(entry->forward);
ao2_cleanup(entry->topic);
entry->topic = NULL;
}
@ -731,13 +801,6 @@ void stasis_log_bad_type_access(const char *name)
ast_log(LOG_ERROR, "Use of %s() before init/after destruction\n", name);
}
/*! \brief Shutdown function */
static void stasis_exit(void)
{
ast_threadpool_shutdown(pool);
pool = NULL;
}
/*! \brief Cleanup function for graceful shutdowns */
static void stasis_cleanup(void)
{
@ -748,36 +811,14 @@ int stasis_init(void)
{
int cache_init;
struct ast_threadpool_options opts;
/* Be sure the types are cleaned up after the message bus */
ast_register_cleanup(stasis_cleanup);
ast_register_atexit(stasis_exit);
if (stasis_config_init() != 0) {
ast_log(LOG_ERROR, "Stasis configuration failed\n");
return -1;
}
if (stasis_wait_init() != 0) {
ast_log(LOG_ERROR, "Stasis initialization failed\n");
return -1;
}
if (pool) {
ast_log(LOG_ERROR, "Stasis double-initialized\n");
return -1;
}
stasis_config_get_threadpool_options(&opts);
ast_debug(3, "Creating Stasis threadpool: initial_size = %d, max_size = %d, idle_timeout_secs = %d\n",
opts.initial_size, opts.max_size, opts.idle_timeout);
pool = ast_threadpool_create("stasis-core", NULL, &opts);
if (!pool) {
ast_log(LOG_ERROR, "Stasis threadpool allocation failed\n");
return -1;
}
cache_init = stasis_cache_init();
if (cache_init != 0) {
return -1;

View File

@ -339,8 +339,6 @@ struct stasis_message *stasis_cache_clear_create(struct stasis_message *id_messa
static void stasis_cache_update_dtor(void *obj)
{
struct stasis_cache_update *update = obj;
ao2_cleanup(update->topic);
update->topic = NULL;
ao2_cleanup(update->old_snapshot);
update->old_snapshot = NULL;
ao2_cleanup(update->new_snapshot);
@ -349,12 +347,11 @@ static void stasis_cache_update_dtor(void *obj)
update->type = NULL;
}
static struct stasis_message *update_create(struct stasis_topic *topic, struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
static struct stasis_message *update_create(struct stasis_message *old_snapshot, struct stasis_message *new_snapshot)
{
RAII_VAR(struct stasis_cache_update *, update, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, msg, NULL, ao2_cleanup);
ast_assert(topic != NULL);
ast_assert(old_snapshot != NULL || new_snapshot != NULL);
update = ao2_alloc_options(sizeof(*update), stasis_cache_update_dtor,
@ -363,8 +360,6 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
return NULL;
}
ao2_ref(topic, +1);
update->topic = topic;
if (old_snapshot) {
ao2_ref(old_snapshot, +1);
update->old_snapshot = old_snapshot;
@ -390,7 +385,7 @@ static struct stasis_message *update_create(struct stasis_topic *topic, struct s
}
static void caching_topic_exec(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
RAII_VAR(struct stasis_caching_topic *, caching_topic_needs_unref, NULL, ao2_cleanup);
struct stasis_caching_topic *caching_topic = data;
@ -418,7 +413,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
if (clear_id) {
old_snapshot = cache_put(caching_topic->cache, clear_type, clear_id, NULL);
if (old_snapshot) {
update = update_create(topic, old_snapshot, NULL);
update = update_create(old_snapshot, NULL);
stasis_publish(caching_topic->topic, update);
return;
}
@ -440,7 +435,7 @@ static void caching_topic_exec(void *data, struct stasis_subscription *sub,
old_snapshot = cache_put(caching_topic->cache, stasis_message_type(message), id, message);
update = update_create(topic, old_snapshot, message);
update = update_create(old_snapshot, message);
if (update == NULL) {
return;
}

View File

@ -39,15 +39,15 @@ struct stasis_cp_all {
struct stasis_topic *topic_cached;
struct stasis_cache *cache;
struct stasis_subscription *forward_all_to_cached;
struct stasis_forward *forward_all_to_cached;
};
struct stasis_cp_single {
struct stasis_topic *topic;
struct stasis_caching_topic *topic_cached;
struct stasis_subscription *forward_topic_to_all;
struct stasis_subscription *forward_cached_to_all;
struct stasis_forward *forward_topic_to_all;
struct stasis_forward *forward_cached_to_all;
};
static void all_dtor(void *obj)
@ -60,7 +60,7 @@ static void all_dtor(void *obj)
all->topic_cached = NULL;
ao2_cleanup(all->cache);
all->cache = NULL;
stasis_unsubscribe_and_join(all->forward_all_to_cached);
stasis_forward_cancel(all->forward_all_to_cached);
all->forward_all_to_cached = NULL;
}
@ -172,9 +172,9 @@ void stasis_cp_single_unsubscribe(struct stasis_cp_single *one)
return;
}
stasis_unsubscribe(one->forward_topic_to_all);
stasis_forward_cancel(one->forward_topic_to_all);
one->forward_topic_to_all = NULL;
stasis_unsubscribe(one->forward_cached_to_all);
stasis_forward_cancel(one->forward_cached_to_all);
one->forward_cached_to_all = NULL;
stasis_caching_unsubscribe(one->topic_cached);
one->topic_cached = NULL;

View File

@ -1,201 +0,0 @@
/*
* Asterisk -- An open source telephony toolkit.
*
* Copyright (C) 2013, Digium, Inc.
*
* David M. Lee, II <dlee@digium.com>
*
* See http://www.asterisk.org for more information about
* the Asterisk project. Please do not directly contact
* any of the maintainers of this project for assistance;
* the project provides a web site, mailing lists and IRC
* channels for your use.
*
* This program is free software, distributed under the terms of
* the GNU General Public License Version 2. See the LICENSE file
* at the top of the source tree.
*/
/*! \file
*
* \brief Stasis Message Bus configuration API.
*
* \author David M. Lee, II <dlee@digium.com>
*/
/*** MODULEINFO
<support_level>core</support_level>
***/
#include "asterisk.h"
ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/config_options.h"
#include "asterisk/stasis.h"
#include "asterisk/threadpool.h"
#include <limits.h>
/*** DOCUMENTATION
<configInfo name="stasis" language="en_US">
<synopsis>Stasis message bus configuration.</synopsis>
<configFile name="stasis.conf">
<configObject name="threadpool">
<synopsis>Threadpool configuration.</synopsis>
<configOption name="initial_size" default="0">
<synopsis>Initial number of threads in the message bus threadpool.</synopsis>
</configOption>
<configOption name="idle_timeout_sec" default="20">
<synopsis>Number of seconds for an idle thread to be disposed of.</synopsis>
</configOption>
<configOption name="max_size" default="200">
<synopsis>Maximum number of threads in the threadpool.</synopsis>
</configOption>
</configObject>
</configFile>
</configInfo>
***/
/*! \brief Locking container for safe configuration access. */
static AO2_GLOBAL_OBJ_STATIC(confs);
struct stasis_threadpool_conf {
int initial_size;
int idle_timeout_sec;
int max_size;
};
struct stasis_conf {
struct stasis_threadpool_conf *threadpool;
};
/*! \brief Mapping of the stasis conf struct's globals to the
* threadpool context in the config file. */
static struct aco_type threadpool_option = {
.type = ACO_GLOBAL,
.name = "threadpool",
.item_offset = offsetof(struct stasis_conf, threadpool),
.category = "^threadpool$",
.category_match = ACO_WHITELIST,
};
static struct aco_type *threadpool_options[] = ACO_TYPES(&threadpool_option);
#define CONF_FILENAME "stasis.conf"
/*! \brief The conf file that's processed for the module. */
static struct aco_file conf_file = {
/*! The config file name. */
.filename = CONF_FILENAME,
/*! The mapping object types to be processed. */
.types = ACO_TYPES(&threadpool_option),
};
static void conf_dtor(void *obj)
{
struct stasis_conf *conf = obj;
ao2_cleanup(conf->threadpool);
conf->threadpool = NULL;
}
static void *conf_alloc(void)
{
RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
conf = ao2_alloc_options(sizeof(*conf), conf_dtor,
AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!conf) {
return NULL;
}
conf->threadpool = ao2_alloc_options(sizeof(*conf->threadpool), NULL,
AO2_ALLOC_OPT_LOCK_NOLOCK);
if (!conf->threadpool) {
return NULL;
}
aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool);
ao2_ref(conf, +1);
return conf;
}
CONFIG_INFO_CORE("stasis", cfg_info, confs, conf_alloc,
.files = ACO_FILES(&conf_file));
void stasis_config_get_threadpool_options(
struct ast_threadpool_options *threadpool_options)
{
RAII_VAR(struct stasis_conf *, conf, NULL, ao2_cleanup);
conf = ao2_global_obj_ref(confs);
ast_assert(conf && conf->threadpool);
{
struct ast_threadpool_options newopts = {
.version = AST_THREADPOOL_OPTIONS_VERSION,
.initial_size = conf->threadpool->initial_size,
.auto_increment = 1,
.idle_timeout = conf->threadpool->idle_timeout_sec,
.max_size = conf->threadpool->max_size,
};
*threadpool_options = newopts;
}
}
/*! \brief Load (or reload) configuration. */
static int process_config(int reload)
{
RAII_VAR(struct stasis_conf *, conf, conf_alloc(), ao2_cleanup);
switch (aco_process_config(&cfg_info, reload)) {
case ACO_PROCESS_ERROR:
if (conf && !reload
&& !aco_set_defaults(&threadpool_option, "threadpool", conf->threadpool)) {
ast_log(AST_LOG_NOTICE, "Failed to process Stasis configuration; using defaults\n");
ao2_global_obj_replace_unref(confs, conf);
return 0;
}
return -1;
case ACO_PROCESS_OK:
case ACO_PROCESS_UNCHANGED:
break;
}
return 0;
}
static void config_exit(void)
{
aco_info_destroy(&cfg_info);
ao2_global_obj_release(confs);
}
int stasis_config_init(void)
{
if (aco_info_init(&cfg_info)) {
aco_info_destroy(&cfg_info);
return -1;
}
ast_register_atexit(config_exit);
/* threadpool section */
aco_option_register(&cfg_info, "initial_size", ACO_EXACT,
threadpool_options, "0", OPT_INT_T, PARSE_IN_RANGE,
FLDSET(struct stasis_threadpool_conf, initial_size), 0,
INT_MAX);
aco_option_register(&cfg_info, "idle_timeout_sec", ACO_EXACT,
threadpool_options, "20", OPT_INT_T, PARSE_IN_RANGE,
FLDSET(struct stasis_threadpool_conf, idle_timeout_sec), 0,
INT_MAX);
aco_option_register(&cfg_info, "max_size", ACO_EXACT,
threadpool_options, "200", OPT_INT_T, PARSE_IN_RANGE,
FLDSET(struct stasis_threadpool_conf, max_size), 0, INT_MAX);
return process_config(0);
}

View File

@ -34,9 +34,6 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/stasis_message_router.h"
/*! Number of hash buckets for the route table. Keep it prime! */
#define ROUTE_TABLE_BUCKETS 7
/*! \internal */
struct stasis_message_route {
/*! Message type handle by this route. */
@ -47,29 +44,79 @@ struct stasis_message_route {
void *data;
};
static void route_dtor(void *obj)
{
struct stasis_message_route *route = obj;
struct route_table {
/*! Current number of entries in the route table */
size_t current_size;
/*! Allocated number of entires in the route table */
size_t max_size;
/*! The route table itself */
struct stasis_message_route routes[];
};
ao2_cleanup(route->message_type);
route->message_type = NULL;
static struct stasis_message_route *table_find_route(struct route_table *table,
struct stasis_message_type *message_type)
{
size_t idx;
/* While a linear search for routes may seem very inefficient, most
* route tables have six routes or less. For such small data, it's
* hard to beat a linear search. If we start having larger route
* tables, then we can look into containers with more efficient
* lookups.
*/
for (idx = 0; idx < table->current_size; ++idx) {
if (table->routes[idx].message_type == message_type) {
return &table->routes[idx];
}
}
return NULL;
}
static int route_hash(const void *obj, const int flags)
static int table_add_route(struct route_table **table_ptr,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
const struct stasis_message_route *route = obj;
const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? obj : route->message_type;
struct route_table *table = *table_ptr;
struct stasis_message_route *route;
return ast_str_hash(stasis_message_type_name(message_type));
ast_assert(table_find_route(table, message_type) == NULL);
if (table->current_size + 1 > table->max_size) {
size_t new_max_size = table->max_size ? table->max_size * 2 : 1;
struct route_table *new_table = ast_realloc(table,
sizeof(*new_table) +
sizeof(new_table->routes[0]) * new_max_size);
if (!new_table) {
return -1;
}
*table_ptr = table = new_table;
table->max_size = new_max_size;
}
route = &table->routes[table->current_size++];
route->message_type = ao2_bump(message_type);
route->callback = callback;
route->data = data;
return 0;
}
static int route_cmp(void *obj, void *arg, int flags)
static int table_remove_route(struct route_table *table,
struct stasis_message_type *message_type)
{
const struct stasis_message_route *left = obj;
const struct stasis_message_route *right = arg;
const struct stasis_message_type *message_type = (flags & OBJ_KEY) ? arg : right->message_type;
size_t idx;
return (left->message_type == message_type) ? CMP_MATCH | CMP_STOP : 0;
for (idx = 0; idx < table->current_size; ++idx) {
if (table->routes[idx].message_type == message_type) {
ao2_cleanup(message_type);
table->routes[idx] =
table->routes[--table->current_size];
return 0;
}
}
return -1;
}
/*! \internal */
@ -77,11 +124,11 @@ struct stasis_message_router {
/*! Subscription to the upstream topic */
struct stasis_subscription *subscription;
/*! Subscribed routes */
struct ao2_container *routes;
/*! Subscribed routes for \ref stasi_cache_update messages */
struct ao2_container *cache_routes;
struct route_table *routes;
/*! Subscribed routes for \ref stasis_cache_update messages */
struct route_table *cache_routes;
/*! Route of last resort */
struct stasis_message_route *default_route;
struct stasis_message_route default_route;
};
static void router_dtor(void *obj)
@ -92,66 +139,60 @@ static void router_dtor(void *obj)
ast_assert(stasis_subscription_is_done(router->subscription));
router->subscription = NULL;
ao2_cleanup(router->routes);
ast_free(router->routes);
router->routes = NULL;
ao2_cleanup(router->cache_routes);
ast_free(router->cache_routes);
router->cache_routes = NULL;
ao2_cleanup(router->default_route);
router->default_route = NULL;
}
static struct stasis_message_route *find_route(
static int find_route(
struct stasis_message_router *router,
struct stasis_message *message)
struct stasis_message *message,
struct stasis_message_route *route_out)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
struct stasis_message_route *route = NULL;
struct stasis_message_type *type = stasis_message_type(message);
SCOPED_AO2LOCK(lock, router);
ast_assert(route_out != NULL);
if (type == stasis_cache_update_type()) {
/* Find a cache route */
struct stasis_cache_update *update =
stasis_message_data(message);
route = ao2_find(router->cache_routes, update->type, OBJ_KEY);
route = table_find_route(router->cache_routes, update->type);
}
if (route == NULL) {
/* Find a regular route */
route = ao2_find(router->routes, type, OBJ_KEY);
route = table_find_route(router->routes, type);
}
if (route == NULL) {
if (route == NULL && router->default_route.callback) {
/* Maybe the default route, then? */
if ((route = router->default_route)) {
ao2_ref(route, +1);
}
route = &router->default_route;
}
if (route == NULL) {
return NULL;
if (!route) {
return -1;
}
ao2_ref(route, +1);
return route;
*route_out = *route;
return 0;
}
static void router_dispatch(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct stasis_message_router *router = data;
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
struct stasis_message_route route;
route = find_route(router, message);
if (route) {
route->callback(route->data, sub, topic, message);
if (find_route(router, message, &route) == 0) {
route.callback(route.data, sub, message);
}
if (stasis_subscription_final_message(sub, message)) {
ao2_cleanup(router);
}
@ -167,14 +208,12 @@ struct stasis_message_router *stasis_message_router_create(
return NULL;
}
router->routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS, route_hash,
route_cmp);
router->routes = ast_calloc(1, sizeof(*router->routes));
if (!router->routes) {
return NULL;
}
router->cache_routes = ao2_container_alloc(ROUTE_TABLE_BUCKETS,
route_hash, route_cmp);
router->cache_routes = ast_calloc(1, sizeof(*router->cache_routes));
if (!router->cache_routes) {
return NULL;
}
@ -216,100 +255,27 @@ int stasis_message_router_is_done(struct stasis_message_router *router)
return stasis_subscription_is_done(router->subscription);
}
static struct stasis_message_route *route_create(
struct stasis_message_type *message_type,
stasis_subscription_cb callback,
void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
route = ao2_alloc(sizeof(*route), route_dtor);
if (!route) {
return NULL;
}
if (message_type) {
ao2_ref(message_type, +1);
}
route->message_type = message_type;
route->callback = callback;
route->data = data;
ao2_ref(route, +1);
return route;
}
static int add_route(struct stasis_message_router *router,
struct stasis_message_route *route)
{
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
existing_route = ao2_find(router->routes, route->message_type, OBJ_KEY);
if (existing_route) {
ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
ao2_link(router->routes, route);
return 0;
}
static int add_cache_route(struct stasis_message_router *router,
struct stasis_message_route *route)
{
RAII_VAR(struct stasis_message_route *, existing_route, NULL, ao2_cleanup);
SCOPED_AO2LOCK(lock, router);
existing_route = ao2_find(router->cache_routes, route->message_type,
OBJ_KEY);
if (existing_route) {
ast_log(LOG_ERROR, "Cannot add route; route exists\n");
return -1;
}
ao2_link(router->cache_routes, route);
return 0;
}
int stasis_message_router_add(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
route = route_create(message_type, callback, data);
if (!route) {
return -1;
}
return add_route(router, route);
SCOPED_AO2LOCK(lock, router);
return table_add_route(&router->routes, message_type, callback, data);
}
int stasis_message_router_add_cache_update(struct stasis_message_router *router,
struct stasis_message_type *message_type,
stasis_subscription_cb callback, void *data)
{
RAII_VAR(struct stasis_message_route *, route, NULL, ao2_cleanup);
route = route_create(message_type, callback, data);
if (!route) {
return -1;
}
return add_cache_route(router, route);
SCOPED_AO2LOCK(lock, router);
return table_add_route(&router->cache_routes, message_type, callback, data);
}
void stasis_message_router_remove(struct stasis_message_router *router,
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
ao2_find(router->routes, message_type,
OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
table_remove_route(router->routes, message_type);
}
void stasis_message_router_remove_cache_update(
@ -317,9 +283,7 @@ void stasis_message_router_remove_cache_update(
struct stasis_message_type *message_type)
{
SCOPED_AO2LOCK(lock, router);
ao2_find(router->cache_routes, message_type,
OBJ_UNLINK | OBJ_NODATA | OBJ_KEY);
table_remove_route(router->cache_routes, message_type);
}
int stasis_message_router_set_default(struct stasis_message_router *router,
@ -327,7 +291,8 @@ int stasis_message_router_set_default(struct stasis_message_router *router,
void *data)
{
SCOPED_AO2LOCK(lock, router);
ao2_cleanup(router->default_route);
router->default_route = route_create(NULL, callback, data);
return router->default_route ? 0 : -1;
router->default_route.callback = callback;
router->default_route.data = data;
/* While this implementation can never fail, it used to be able to */
return 0;
}

View File

@ -55,7 +55,7 @@ static void caching_guarantee_dtor(void *obj)
}
static void guarantee_handler(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
/* Wait for our particular message */
if (data == message) {

View File

@ -37,6 +37,7 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
#include "asterisk/astobj2.h"
#include "asterisk/cli.h"
#include "asterisk/taskprocessor.h"
#include "asterisk/sem.h"
/*!
* \brief tps_task structure is queued to a taskprocessor
@ -47,11 +48,15 @@ ASTERISK_FILE_VERSION(__FILE__, "$Revision$")
*/
struct tps_task {
/*! \brief The execute() task callback function pointer */
int (*execute)(void *datap);
union {
int (*execute)(void *datap);
int (*execute_local)(struct ast_taskprocessor_local *local);
} callback;
/*! \brief The data pointer for the task execute() function */
void *datap;
/*! \brief AST_LIST_ENTRY overhead */
AST_LIST_ENTRY(tps_task) list;
unsigned int wants_local:1;
};
/*! \brief tps_taskprocessor_stats maintain statistics for a taskprocessor. */
@ -68,6 +73,7 @@ struct ast_taskprocessor {
const char *name;
/*! \brief Taskprocessor statistics */
struct tps_taskprocessor_stats *stats;
void *local_data;
/*! \brief Taskprocessor current queue size */
long tps_queue_size;
/*! \brief Taskprocessor queue */
@ -113,9 +119,6 @@ static int tps_hash_cb(const void *obj, const int flags);
/*! \brief The astobj2 compare callback for taskprocessors */
static int tps_cmp_cb(void *obj, void *arg, int flags);
/*! \brief The task processing function executed by a taskprocessor */
static void *tps_processing_function(void *data);
/*! \brief Destroy the taskprocessor when its refcount reaches zero */
static void tps_taskprocessor_destroy(void *tps);
@ -138,47 +141,56 @@ static struct ast_cli_entry taskprocessor_clis[] = {
struct default_taskprocessor_listener_pvt {
pthread_t poll_thread;
ast_mutex_t lock;
ast_cond_t cond;
int wake_up;
int dead;
struct ast_sem sem;
};
static void default_tps_wake_up(struct default_taskprocessor_listener_pvt *pvt, int should_die)
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
{
SCOPED_MUTEX(lock, &pvt->lock);
pvt->wake_up = 1;
pvt->dead = should_die;
ast_cond_signal(&pvt->cond);
ast_assert(pvt->dead);
ast_sem_destroy(&pvt->sem);
ast_free(pvt);
}
static int default_tps_idle(struct default_taskprocessor_listener_pvt *pvt)
static void default_listener_pvt_dtor(struct ast_taskprocessor_listener *listener)
{
SCOPED_MUTEX(lock, &pvt->lock);
while (!pvt->wake_up) {
ast_cond_wait(&pvt->cond, lock);
}
pvt->wake_up = 0;
return pvt->dead;
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
default_listener_pvt_destroy(pvt);
listener->user_data = NULL;
}
/*!
* \brief Function that processes tasks in the taskprocessor
* \internal
*/
static void *tps_processing_function(void *data)
static void *default_tps_processing_function(void *data)
{
struct ast_taskprocessor_listener *listener = data;
struct ast_taskprocessor *tps = listener->tps;
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
int dead = 0;
int sem_value;
int res;
while (!dead) {
if (!ast_taskprocessor_execute(tps)) {
dead = default_tps_idle(pvt);
while (!pvt->dead) {
res = ast_sem_wait(&pvt->sem);
if (res != 0 && errno != EINTR) {
ast_log(LOG_ERROR, "ast_sem_wait(): %s\n",
strerror(errno));
/* Just give up */
break;
}
ast_taskprocessor_execute(tps);
}
/* No posting to a dead taskprocessor! */
res = ast_sem_getvalue(&pvt->sem, &sem_value);
ast_assert(res == 0 && sem_value == 0);
/* Free the shutdown reference (see default_listener_shutdown) */
ao2_t_ref(listener->tps, -1, "tps-shutdown");
return NULL;
}
@ -186,7 +198,7 @@ static int default_listener_start(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
if (ast_pthread_create(&pvt->poll_thread, NULL, tps_processing_function, listener)) {
if (ast_pthread_create(&pvt->poll_thread, NULL, default_tps_processing_function, listener)) {
return -1;
}
@ -197,33 +209,50 @@ static void default_task_pushed(struct ast_taskprocessor_listener *listener, int
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
ast_assert(!pvt->dead);
if (was_empty) {
default_tps_wake_up(pvt, 0);
if (ast_sem_post(&pvt->sem) != 0) {
ast_log(LOG_ERROR, "Failed to notify of enqueued task: %s\n",
strerror(errno));
}
}
static void default_listener_pvt_destroy(struct default_taskprocessor_listener_pvt *pvt)
static int default_listener_die(void *data)
{
ast_mutex_destroy(&pvt->lock);
ast_cond_destroy(&pvt->cond);
ast_free(pvt);
struct default_taskprocessor_listener_pvt *pvt = data;
pvt->dead = 1;
return 0;
}
static void default_listener_shutdown(struct ast_taskprocessor_listener *listener)
{
struct default_taskprocessor_listener_pvt *pvt = listener->user_data;
default_tps_wake_up(pvt, 1);
pthread_join(pvt->poll_thread, NULL);
int res;
/* Hold a reference during shutdown */
ao2_t_ref(listener->tps, +1, "tps-shutdown");
ast_taskprocessor_push(listener->tps, default_listener_die, pvt);
if (pthread_self() == pvt->poll_thread) {
res = pthread_detach(pvt->poll_thread);
if (res != 0) {
ast_log(LOG_ERROR, "pthread_detach(): %s\n",
strerror(errno));
}
} else {
res = pthread_join(pvt->poll_thread, NULL);
if (res != 0) {
ast_log(LOG_ERROR, "pthread_join(): %s\n",
strerror(errno));
}
}
pvt->poll_thread = AST_PTHREADT_NULL;
default_listener_pvt_destroy(pvt);
}
static const struct ast_taskprocessor_listener_callbacks default_listener_callbacks = {
.start = default_listener_start,
.task_pushed = default_task_pushed,
.shutdown = default_listener_shutdown,
.dtor = default_listener_pvt_dtor,
};
/*!
@ -258,19 +287,48 @@ int ast_tps_init(void)
static struct tps_task *tps_task_alloc(int (*task_exe)(void *datap), void *datap)
{
struct tps_task *t;
if ((t = ast_calloc(1, sizeof(*t)))) {
t->execute = task_exe;
t->datap = datap;
if (!task_exe) {
ast_log(LOG_ERROR, "task_exe is NULL!\n");
return NULL;
}
t = ast_calloc(1, sizeof(*t));
if (!t) {
ast_log(LOG_ERROR, "failed to allocate task!\n");
return NULL;
}
t->callback.execute = task_exe;
t->datap = datap;
return t;
}
static struct tps_task *tps_task_alloc_local(int (*task_exe)(struct ast_taskprocessor_local *local), void *datap)
{
struct tps_task *t;
if (!task_exe) {
ast_log(LOG_ERROR, "task_exe is NULL!\n");
return NULL;
}
t = ast_calloc(1, sizeof(*t));
if (!t) {
ast_log(LOG_ERROR, "failed to allocate task!\n");
return NULL;
}
t->callback.execute_local = task_exe;
t->datap = datap;
t->wants_local = 1;
return t;
}
/* release task resources */
static void *tps_task_free(struct tps_task *task)
{
if (task) {
ast_free(task);
}
ast_free(task);
return NULL;
}
@ -425,16 +483,10 @@ static void tps_taskprocessor_destroy(void *tps)
}
ast_debug(1, "destroying taskprocessor '%s'\n", t->name);
/* free it */
if (t->stats) {
ast_free(t->stats);
t->stats = NULL;
}
ast_free(t->stats);
t->stats = NULL;
ast_free((char *) t->name);
if (t->listener) {
/* This code should not be reached since the listener
* should have been destroyed before the taskprocessor could
* be destroyed
*/
ao2_ref(t->listener, -1);
t->listener = NULL;
}
@ -447,7 +499,6 @@ static void tps_taskprocessor_destroy(void *tps)
static struct tps_task *tps_taskprocessor_pop(struct ast_taskprocessor *tps)
{
struct tps_task *task;
SCOPED_AO2LOCK(lock, tps);
if ((task = AST_LIST_REMOVE_HEAD(&tps->tps_queue, list))) {
tps->tps_queue_size--;
@ -476,10 +527,21 @@ static void listener_shutdown(struct ast_taskprocessor_listener *listener)
ao2_ref(listener->tps, -1);
}
static void taskprocessor_listener_dtor(void *obj)
{
struct ast_taskprocessor_listener *listener = obj;
if (listener->callbacks->dtor) {
listener->callbacks->dtor(listener);
}
}
struct ast_taskprocessor_listener *ast_taskprocessor_listener_alloc(const struct ast_taskprocessor_listener_callbacks *callbacks, void *user_data)
{
RAII_VAR(struct ast_taskprocessor_listener *, listener,
ao2_alloc(sizeof(*listener), NULL), ao2_cleanup);
NULL, ao2_cleanup);
listener = ao2_alloc(sizeof(*listener), taskprocessor_listener_dtor);
if (!listener) {
return NULL;
@ -510,9 +572,12 @@ static void *default_listener_pvt_alloc(void)
if (!pvt) {
return NULL;
}
ast_cond_init(&pvt->cond, NULL);
ast_mutex_init(&pvt->lock);
pvt->poll_thread = AST_PTHREADT_NULL;
if (ast_sem_init(&pvt->sem, 0, 0) != 0) {
ast_log(LOG_ERROR, "ast_sem_init(): %s\n", strerror(errno));
ast_free(pvt);
return NULL;
}
return pvt;
}
@ -594,7 +659,6 @@ struct ast_taskprocessor *ast_taskprocessor_get(const char *name, enum ast_tps_o
p = __allocate_taskprocessor(name, listener);
if (!p) {
default_listener_pvt_destroy(pvt);
ao2_ref(listener, -1);
return NULL;
}
@ -615,6 +679,13 @@ struct ast_taskprocessor *ast_taskprocessor_create_with_listener(const char *nam
return __allocate_taskprocessor(name, listener);
}
void ast_taskprocessor_set_local(struct ast_taskprocessor *tps,
void *local_data)
{
SCOPED_AO2LOCK(lock, tps);
tps->local_data = local_data;
}
/* decrement the taskprocessor reference count and unlink from the container if necessary */
void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
{
@ -636,20 +707,21 @@ void *ast_taskprocessor_unreference(struct ast_taskprocessor *tps)
}
/* push the task into the taskprocessor queue */
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
static int taskprocessor_push(struct ast_taskprocessor *tps, struct tps_task *t)
{
struct tps_task *t;
int previous_size;
int was_empty;
if (!tps || !task_exe) {
ast_log(LOG_ERROR, "%s is missing!!\n", (tps) ? "task callback" : "taskprocessor");
if (!tps) {
ast_log(LOG_ERROR, "tps is NULL!\n");
return -1;
}
if (!(t = tps_task_alloc(task_exe, datap))) {
ast_log(LOG_ERROR, "failed to allocate task! Can't push to '%s'\n", tps->name);
if (!t) {
ast_log(LOG_ERROR, "t is NULL!\n");
return -1;
}
ao2_lock(tps);
AST_LIST_INSERT_TAIL(&tps->tps_queue, t, list);
previous_size = tps->tps_queue_size++;
@ -660,21 +732,43 @@ int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *
return 0;
}
int ast_taskprocessor_push(struct ast_taskprocessor *tps, int (*task_exe)(void *datap), void *datap)
{
return taskprocessor_push(tps, tps_task_alloc(task_exe, datap));
}
int ast_taskprocessor_push_local(struct ast_taskprocessor *tps, int (*task_exe)(struct ast_taskprocessor_local *datap), void *datap)
{
return taskprocessor_push(tps, tps_task_alloc_local(task_exe, datap));
}
int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
{
struct ast_taskprocessor_local local;
struct tps_task *t;
int size;
ao2_lock(tps);
t = tps_taskprocessor_pop(tps);
if (!t) {
ao2_unlock(tps);
return 0;
}
tps->executing = 1;
if (t->wants_local) {
local.local_data = tps->local_data;
local.data = t->datap;
}
ao2_unlock(tps);
t = tps_taskprocessor_pop(tps);
if (t) {
t->execute(t->datap);
tps_task_free(t);
if (t->wants_local) {
t->callback.execute_local(&local);
} else {
t->callback.execute(t->datap);
}
tps_task_free(t);
ao2_lock(tps);
/* We need to check size in the same critical section where we reset the
@ -684,7 +778,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
tps->executing = 0;
size = tps_taskprocessor_depth(tps);
/* If we executed a task, bump the stats */
if (t && tps->stats) {
if (tps->stats) {
tps->stats->_tasks_processed_count++;
if (size > tps->stats->max_qsize) {
tps->stats->max_qsize = size;
@ -693,7 +787,7 @@ int ast_taskprocessor_execute(struct ast_taskprocessor *tps)
ao2_unlock(tps);
/* If we executed a task, check for the transition to empty */
if (t && size == 0 && tps->listener->callbacks->emptied) {
if (size == 0 && tps->listener->callbacks->emptied) {
tps->listener->callbacks->emptied(tps->listener);
}
return size > 0;

View File

@ -739,7 +739,7 @@ announce_cleanup:
cap_slin = ast_format_cap_destroy(cap_slin);
}
static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void park_announce_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct park_announce_subscription_data *pa_data = data;
char *dial_string = pa_data->dial_string;

View File

@ -125,7 +125,7 @@ static void parker_parked_call_message_response(struct ast_parked_call_payload *
}
}
static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void parker_update_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
if (stasis_subscription_final_message(sub, message)) {
ast_free(data);

View File

@ -545,7 +545,7 @@ static void parked_call_message_response(struct ast_parked_call_payload *parked_
);
}
static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void parking_event_cb(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
if (stasis_message_type(message) == ast_parked_call_type()) {
struct ast_parked_call_payload *parked_call_message = stasis_message_data(message);

View File

@ -1040,7 +1040,7 @@ STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_exec_type);
STASIS_MESSAGE_TYPE_DEFN_LOCAL(agi_async_end_type);
static void agi_channel_manager_event(void *data,
struct stasis_subscription *sub, struct stasis_topic *topic,
struct stasis_subscription *sub,
struct stasis_message *message)
{
const char *type = data;

View File

@ -57,7 +57,7 @@ static struct stasis_message_router *router;
* \param message The message itself.
*/
static void statsmaker(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
RAII_VAR(struct ast_str *, metric, NULL, ast_free);
@ -89,7 +89,7 @@ static void statsmaker(void *data, struct stasis_subscription *sub,
* \param message The message itself.
*/
static void updates(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
/* Since this came from a message router, we know the type of the
* message. We can cast the data without checking its type.
@ -139,7 +139,7 @@ static void updates(void *data, struct stasis_subscription *sub,
* \param message The message itself.
*/
static void default_route(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
if (stasis_subscription_final_message(sub, message)) {
/* Much like with the regular subscription, you may need to

View File

@ -371,8 +371,8 @@ static void aji_pubsub_purge_nodes(struct aji_client *client,
const char* collection_name);
static void aji_publish_mwi(struct aji_client *client, const char *mailbox,
const char *context, const char *oldmsgs, const char *newmsgs);
static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg);
static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg);
static iks* aji_build_publish_skeleton(struct aji_client *client, const char *node,
const char *event_type, unsigned int cachable);
/* No transports in this version */
@ -3235,7 +3235,7 @@ int ast_aji_disconnect(struct aji_client *client)
* \param data void pointer to ast_client structure
* \return void
*/
static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
const char *mailbox;
const char *context;
@ -3269,7 +3269,7 @@ static void aji_mwi_cb(void *data, struct stasis_subscription *sub, struct stasi
* \param data void pointer to ast_client structure
* \return void
*/
static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void aji_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct aji_client *client = data;
struct ast_device_state_message *dev_state;
@ -3291,7 +3291,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags)
{
struct stasis_message *msg = obj;
struct aji_client *client = arg;
aji_devstate_cb(client, device_state_sub, NULL, msg);
aji_devstate_cb(client, device_state_sub, msg);
return 0;
}

View File

@ -9,6 +9,7 @@
#define RES_PJSIP_PRIVATE_H_
struct ao2_container;
struct ast_threadpool_options;
/*!
* \brief Initialize the configuration for res_pjsip

View File

@ -118,7 +118,7 @@ struct mwi_subscription {
};
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg);
struct stasis_message *msg);
static struct mwi_stasis_subscription *mwi_stasis_subscription_alloc(const char *mailbox, struct mwi_subscription *mwi_sub)
{
@ -603,7 +603,7 @@ static int serialized_cleanup(void *userdata)
}
static void mwi_stasis_cb(void *userdata, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *msg)
struct stasis_message *msg)
{
struct mwi_subscription *mwi_sub = userdata;

View File

@ -143,7 +143,7 @@ static int refer_progress_notify(void *data)
}
static void refer_progress_bridge(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct refer_progress *progress = data;
struct ast_bridge_blob *enter_blob;

View File

@ -117,7 +117,7 @@ static void security_event_stasis_cb(struct ast_json *json)
}
static void security_stasis_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct ast_json_payload *payload = stasis_message_data(message);

View File

@ -120,7 +120,7 @@ struct stasis_message_sink *stasis_message_sink_create(void)
* the initial lazy binding will still work as expected.
*/
static void message_sink_cb(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct stasis_message_sink *sink = data;

View File

@ -1318,7 +1318,7 @@ static void xmpp_pubsub_publish_device_state(struct ast_xmpp_client *client, con
* \param data void pointer to ast_client structure
* \return void
*/
static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_xmpp_client *client = data;
const char *mailbox, *context;
@ -1351,7 +1351,7 @@ static void xmpp_pubsub_mwi_cb(void *data, struct stasis_subscription *sub, stru
* \param data void pointer to ast_client structure
* \return void
*/
static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *msg)
static void xmpp_pubsub_devstate_cb(void *data, struct stasis_subscription *sub, struct stasis_message *msg)
{
struct ast_xmpp_client *client = data;
struct ast_device_state_message *dev_state;
@ -1566,7 +1566,7 @@ static int cached_devstate_cb(void *obj, void *arg, int flags)
{
struct stasis_message *msg = obj;
struct ast_xmpp_client *client = arg;
xmpp_pubsub_devstate_cb(client, client->device_state_sub, NULL, msg);
xmpp_pubsub_devstate_cb(client, client->device_state_sub, msg);
return 0;
}

View File

@ -58,9 +58,9 @@ struct app_forwards {
int interested;
/*! Forward for the regular topic */
struct stasis_subscription *topic_forward;
struct stasis_forward *topic_forward;
/*! Forward for the caching topic */
struct stasis_subscription *topic_cached_forward;
struct stasis_forward *topic_cached_forward;
/*! Unique id of the object being forwarded */
char id[];
@ -78,9 +78,9 @@ static void forwards_dtor(void *obj)
static void forwards_unsubscribe(struct app_forwards *forwards)
{
stasis_unsubscribe(forwards->topic_forward);
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
stasis_unsubscribe(forwards->topic_cached_forward);
stasis_forward_cancel(forwards->topic_cached_forward);
forwards->topic_cached_forward = NULL;
}
@ -129,7 +129,7 @@ static struct app_forwards *forwards_create_channel(struct app *app,
ast_channel_topic_cached(chan), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_unsubscribe(forwards->topic_forward);
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
@ -163,7 +163,7 @@ static struct app_forwards *forwards_create_bridge(struct app *app,
ast_bridge_topic_cached(bridge), app->topic);
if (!forwards->topic_cached_forward) {
/* Half-subscribed is a bad thing */
stasis_unsubscribe(forwards->topic_forward);
stasis_forward_cancel(forwards->topic_forward);
forwards->topic_forward = NULL;
return NULL;
}
@ -220,7 +220,7 @@ static void app_dtor(void *obj)
}
static void sub_default_handler(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct app *app = data;
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@ -363,7 +363,6 @@ static channel_snapshot_monitor channel_monitors[] = {
static void sub_channel_update_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
struct app *app = data;
@ -411,7 +410,6 @@ static struct ast_json *simple_bridge_event(
static void sub_bridge_update_handler(void *data,
struct stasis_subscription *sub,
struct stasis_topic *topic,
struct stasis_message *message)
{
RAII_VAR(struct ast_json *, json, NULL, ast_json_unref);
@ -447,7 +445,7 @@ static void sub_bridge_update_handler(void *data,
}
static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
struct app *app = data;
struct ast_bridge_merge_message *merge;
@ -476,7 +474,7 @@ static void bridge_merge_handler(void *data, struct stasis_subscription *sub,
}
/* Forward the message to the app */
stasis_forward_message(app->topic, topic, message);
stasis_publish(app->topic, message);
}
struct app *app_create(const char *name, stasis_app_cb handler, void *data)

View File

@ -309,7 +309,7 @@ static struct consumer *consumer_create(void) {
return consumer;
}
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@ -342,7 +342,7 @@ static void consumer_exec(void *data, struct stasis_subscription *sub, struct st
}
}
static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void consumer_finalize(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;

View File

@ -183,7 +183,7 @@ static struct consumer *consumer_create(int ignore_subscriptions) {
return consumer;
}
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_topic *topic, struct stasis_message *message)
static void consumer_exec(void *data, struct stasis_subscription *sub, struct stasis_message *message)
{
struct consumer *consumer = data;
RAII_VAR(struct consumer *, consumer_needs_cleanup, NULL, ao2_cleanup);
@ -427,7 +427,7 @@ AST_TEST_DEFINE(forward)
RAII_VAR(struct consumer *, parent_consumer, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, forward_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_forward *, forward_sub, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, parent_sub, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
@ -499,8 +499,8 @@ AST_TEST_DEFINE(interleaving)
RAII_VAR(struct stasis_message *, test_message2, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message *, test_message3, NULL, ao2_cleanup);
RAII_VAR(struct stasis_subscription *, forward_sub1, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_subscription *, forward_sub2, NULL, stasis_unsubscribe);
RAII_VAR(struct stasis_forward *, forward_sub1, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_forward *, forward_sub2, NULL, stasis_forward_cancel);
RAII_VAR(struct stasis_subscription *, sub, NULL, stasis_unsubscribe);
RAII_VAR(struct consumer *, consumer, NULL, ao2_cleanup);
@ -711,7 +711,6 @@ AST_TEST_DEFINE(cache)
/* Check for new snapshot messages */
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[0]));
actual_update = stasis_message_data(consumer->messages_rxed[0]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message1_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message1_1 == stasis_cache_get(cache, cache_type, "1"));
@ -720,7 +719,6 @@ AST_TEST_DEFINE(cache)
ast_test_validate(test, stasis_cache_update_type() == stasis_message_type(consumer->messages_rxed[1]));
actual_update = stasis_message_data(consumer->messages_rxed[1]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, NULL == actual_update->old_snapshot);
ast_test_validate(test, test_message2_1 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_1 == stasis_cache_get(cache, cache_type, "2"));
@ -736,7 +734,6 @@ AST_TEST_DEFINE(cache)
ast_test_validate(test, 3 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[2]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message2_1 == actual_update->old_snapshot);
ast_test_validate(test, test_message2_2 == actual_update->new_snapshot);
ast_test_validate(test, test_message2_2 == stasis_cache_get(cache, cache_type, "2"));
@ -752,7 +749,6 @@ AST_TEST_DEFINE(cache)
ast_test_validate(test, 4 == actual_len);
actual_update = stasis_message_data(consumer->messages_rxed[3]);
ast_test_validate(test, topic == actual_update->topic);
ast_test_validate(test, test_message1_1 == actual_update->old_snapshot);
ast_test_validate(test, NULL == actual_update->new_snapshot);
ast_test_validate(test, NULL == stasis_cache_get(cache, cache_type, "1"));
@ -867,52 +863,6 @@ AST_TEST_DEFINE(cache_dump)
return AST_TEST_PASS;
}
AST_TEST_DEFINE(route_conflicts)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
RAII_VAR(struct stasis_message_router *, uut, NULL, stasis_message_router_unsubscribe_and_join);
RAII_VAR(struct stasis_message_type *, test_message_type, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer1, NULL, ao2_cleanup);
RAII_VAR(struct consumer *, consumer2, NULL, ao2_cleanup);
int ret;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = test_category;
info->summary =
"Multiple routes to the same message_type should fail";
info->description =
"Multiple routes to the same message_type should fail";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
topic = stasis_topic_create("TestTopic");
ast_test_validate(test, NULL != topic);
consumer1 = consumer_create(1);
ast_test_validate(test, NULL != consumer1);
consumer2 = consumer_create(1);
ast_test_validate(test, NULL != consumer2);
test_message_type = stasis_message_type_create("TestMessage", NULL);
ast_test_validate(test, NULL != test_message_type);
uut = stasis_message_router_create(topic);
ast_test_validate(test, NULL != uut);
ret = stasis_message_router_add(
uut, test_message_type, consumer_exec, consumer1);
ast_test_validate(test, 0 == ret);
ret = stasis_message_router_add(
uut, test_message_type, consumer_exec, consumer2);
ast_test_validate(test, 0 != ret);
return AST_TEST_PASS;
}
AST_TEST_DEFINE(router)
{
RAII_VAR(struct stasis_topic *, topic, NULL, ao2_cleanup);
@ -1272,7 +1222,7 @@ AST_TEST_DEFINE(to_ami)
}
static void noop(void *data, struct stasis_subscription *sub,
struct stasis_topic *topic, struct stasis_message *message)
struct stasis_message *message)
{
/* no-op */
}
@ -1373,7 +1323,6 @@ static int unload_module(void)
AST_TEST_UNREGISTER(cache_filter);
AST_TEST_UNREGISTER(cache);
AST_TEST_UNREGISTER(cache_dump);
AST_TEST_UNREGISTER(route_conflicts);
AST_TEST_UNREGISTER(router);
AST_TEST_UNREGISTER(router_cache_updates);
AST_TEST_UNREGISTER(interleaving);
@ -1397,7 +1346,6 @@ static int load_module(void)
AST_TEST_REGISTER(cache_filter);
AST_TEST_REGISTER(cache);
AST_TEST_REGISTER(cache_dump);
AST_TEST_REGISTER(route_conflicts);
AST_TEST_REGISTER(router);
AST_TEST_REGISTER(router_cache_updates);
AST_TEST_REGISTER(interleaving);

View File

@ -264,11 +264,14 @@ AST_TEST_DEFINE(channel_messages)
type = stasis_message_type(msg);
ast_test_validate(test, ast_channel_snapshot_type() == type);
/* The ordering of the cache clear and endpoint snapshot are
* unspecified */
msg = sink->messages[3];
type = stasis_message_type(msg);
ast_test_validate(test, stasis_cache_clear_type() == type);
if (stasis_message_type(msg) == stasis_cache_clear_type()) {
/* Okay; the next message should be the endpoint snapshot */
msg = sink->messages[4];
}
msg = sink->messages[4];
type = stasis_message_type(msg);
ast_test_validate(test, ast_endpoint_snapshot_type() == type);
actual_snapshot = stasis_message_data(msg);

View File

@ -48,6 +48,31 @@ struct task_data {
int task_complete;
};
static void task_data_dtor(void *obj)
{
struct task_data *task_data = obj;
ast_mutex_destroy(&task_data->lock);
ast_cond_destroy(&task_data->cond);
}
/*! \brief Create a task_data object */
static struct task_data *task_data_create(void)
{
struct task_data *task_data =
ao2_alloc(sizeof(*task_data), task_data_dtor);
if (!task_data) {
return NULL;
}
ast_cond_init(&task_data->cond, NULL);
ast_mutex_init(&task_data->lock);
task_data->task_complete = 0;
return task_data;
}
/*!
* \brief Queued task for baseline test.
*
@ -64,6 +89,30 @@ static int task(void *data)
return 0;
}
/*!
* \brief Wait for a task to execute.
*/
static int task_wait(struct task_data *task_data)
{
struct timeval start = ast_tvnow();
struct timespec end;
SCOPED_MUTEX(lock, &task_data->lock);
end.tv_sec = start.tv_sec + 30;
end.tv_nsec = start.tv_usec * 1000;
while (!task_data->task_complete) {
int res;
res = ast_cond_timedwait(&task_data->cond, &task_data->lock,
&end);
if (res == ETIMEDOUT) {
return -1;
}
}
return 0;
}
/*!
* \brief Baseline test for default taskprocessor
*
@ -73,12 +122,9 @@ static int task(void *data)
*/
AST_TEST_DEFINE(default_taskprocessor)
{
struct ast_taskprocessor *tps;
struct task_data task_data;
struct timeval start;
struct timespec ts;
enum ast_test_result_state res = AST_TEST_PASS;
int timedwait_res;
RAII_VAR(struct ast_taskprocessor *, tps, NULL, ast_taskprocessor_unreference);
RAII_VAR(struct task_data *, task_data, NULL, ao2_cleanup);
int res;
switch (cmd) {
case TEST_INIT:
@ -99,36 +145,21 @@ AST_TEST_DEFINE(default_taskprocessor)
return AST_TEST_FAIL;
}
start = ast_tvnow();
ts.tv_sec = start.tv_sec + 30;
ts.tv_nsec = start.tv_usec * 1000;
ast_cond_init(&task_data.cond, NULL);
ast_mutex_init(&task_data.lock);
task_data.task_complete = 0;
ast_taskprocessor_push(tps, task, &task_data);
ast_mutex_lock(&task_data.lock);
while (!task_data.task_complete) {
timedwait_res = ast_cond_timedwait(&task_data.cond, &task_data.lock, &ts);
if (timedwait_res == ETIMEDOUT) {
break;
}
task_data = task_data_create();
if (!task_data) {
ast_test_status_update(test, "Unable to create task_data\n");
return AST_TEST_FAIL;
}
ast_mutex_unlock(&task_data.lock);
if (!task_data.task_complete) {
ast_taskprocessor_push(tps, task, task_data);
res = task_wait(task_data);
if (res != 0) {
ast_test_status_update(test, "Queued task did not execute!\n");
res = AST_TEST_FAIL;
goto test_end;
return AST_TEST_FAIL;
}
test_end:
tps = ast_taskprocessor_unreference(tps);
ast_mutex_destroy(&task_data.lock);
ast_cond_destroy(&task_data.cond);
return res;
return AST_TEST_PASS;
}
#define NUM_TASKS 20000
@ -631,12 +662,78 @@ AST_TEST_DEFINE(taskprocessor_shutdown)
return AST_TEST_PASS;
}
static int local_task_exe(struct ast_taskprocessor_local *local)
{
int *local_data = local->local_data;
struct task_data *task_data = local->data;
*local_data = 1;
task(task_data);
return 0;
}
AST_TEST_DEFINE(taskprocessor_push_local)
{
RAII_VAR(struct ast_taskprocessor *, tps, NULL,
ast_taskprocessor_unreference);
struct task_data *task_data;
int local_data;
int res;
switch (cmd) {
case TEST_INIT:
info->name = __func__;
info->category = "/main/taskprocessor/";
info->summary = "Test of pushing local data";
info->description =
"Ensures that local data is passed along.";
return AST_TEST_NOT_RUN;
case TEST_EXECUTE:
break;
}
tps = ast_taskprocessor_get("test", TPS_REF_DEFAULT);
if (!tps) {
ast_test_status_update(test, "Unable to create test taskprocessor\n");
return AST_TEST_FAIL;
}
task_data = task_data_create();
if (!task_data) {
ast_test_status_update(test, "Unable to create task_data\n");
return AST_TEST_FAIL;
}
local_data = 0;
ast_taskprocessor_set_local(tps, &local_data);
ast_taskprocessor_push_local(tps, local_task_exe, task_data);
res = task_wait(task_data);
if (res != 0) {
ast_test_status_update(test, "Queued task did not execute!\n");
return AST_TEST_FAIL;
}
if (local_data != 1) {
ast_test_status_update(test,
"Queued task did not set local_data!\n");
return AST_TEST_FAIL;
}
return AST_TEST_PASS;
}
static int unload_module(void)
{
ast_test_unregister(default_taskprocessor);
ast_test_unregister(default_taskprocessor_load);
ast_test_unregister(taskprocessor_listener);
ast_test_unregister(taskprocessor_shutdown);
ast_test_unregister(taskprocessor_push_local);
return 0;
}
@ -646,6 +743,7 @@ static int load_module(void)
ast_test_register(default_taskprocessor_load);
ast_test_register(taskprocessor_listener);
ast_test_register(taskprocessor_shutdown);
ast_test_register(taskprocessor_push_local);
return AST_MODULE_LOAD_SUCCESS;
}