#include "asterisk.h"
#include <sys/stat.h>
#include <errno.h>
#include <time.h>
#include <utime.h>
#include <stdlib.h>
#include <unistd.h>
#include <dirent.h>
#include <string.h>
#include <stdio.h>
#include "asterisk/lock.h"
#include "asterisk/file.h"
#include "asterisk/logger.h"
#include "asterisk/channel.h"
#include "asterisk/callerid.h"
#include "asterisk/pbx.h"
#include "asterisk/module.h"
#include "asterisk/options.h"
#include "asterisk/utils.h"
Include dependency graph for pbx_spool.c:
Go to the source code of this file.
Data Structures | |
struct | outgoing |
Enumerations | |
enum | { SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), SPOOL_FLAG_ARCHIVE = (1 << 1) } |
Functions | |
static int | apply_outgoing (struct outgoing *o, char *fn, FILE *f) |
AST_MODULE_INFO_STANDARD (ASTERISK_GPL_KEY,"Outgoing Spool Support") | |
static void * | attempt_thread (void *data) |
static void | free_outgoing (struct outgoing *o) |
static void | init_outgoing (struct outgoing *o) |
static void | launch_service (struct outgoing *o) |
static int | load_module (void) |
static int | remove_from_queue (struct outgoing *o, const char *status) |
Remove a call file from the outgoing queue optionally moving it in the archive dir. | |
static void | safe_append (struct outgoing *o, time_t now, char *s) |
static int | scan_service (char *fn, time_t now, time_t atime) |
static void * | scan_thread (void *unused) |
static int | unload_module (void) |
Variables | |
static char | qdir [255] |
static char | qdonedir [255] |
Definition in file pbx_spool.c.
anonymous enum |
Definition at line 56 of file pbx_spool.c.
00056 { 00057 /*! Always delete the call file after a call succeeds or the 00058 * maximum number of retries is exceeded, even if the 00059 * modification time of the call file is in the future. 00060 */ 00061 SPOOL_FLAG_ALWAYS_DELETE = (1 << 0), 00062 /* Don't unlink the call file after processing, move in qdonedir */ 00063 SPOOL_FLAG_ARCHIVE = (1 << 1) 00064 };
static int apply_outgoing | ( | struct outgoing * | o, | |
char * | fn, | |||
FILE * | f | |||
) | [static] |
Definition at line 126 of file pbx_spool.c.
References outgoing::account, outgoing::app, ast_callerid_split(), ast_log(), ast_set2_flag, ast_strlen_zero(), ast_true(), ast_variable_new(), outgoing::callingpid, outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, outgoing::fn, lineno, LOG_NOTICE, LOG_WARNING, outgoing::maxretries, outgoing::options, outgoing::priority, outgoing::retries, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, SPOOL_FLAG_ARCHIVE, strsep(), outgoing::tech, var, outgoing::vars, and outgoing::waittime.
Referenced by scan_service().
00127 { 00128 char buf[256]; 00129 char *c, *c2; 00130 int lineno = 0; 00131 struct ast_variable *var; 00132 00133 while(fgets(buf, sizeof(buf), f)) { 00134 lineno++; 00135 /* Trim comments */ 00136 c = buf; 00137 while ((c = strchr(c, '#'))) { 00138 if ((c == buf) || (*(c-1) == ' ') || (*(c-1) == '\t')) 00139 *c = '\0'; 00140 else 00141 c++; 00142 } 00143 00144 c = buf; 00145 while ((c = strchr(c, ';'))) { 00146 if ((c > buf) && (c[-1] == '\\')) { 00147 memmove(c - 1, c, strlen(c) + 1); 00148 c++; 00149 } else { 00150 *c = '\0'; 00151 break; 00152 } 00153 } 00154 00155 /* Trim trailing white space */ 00156 while(!ast_strlen_zero(buf) && buf[strlen(buf) - 1] < 33) 00157 buf[strlen(buf) - 1] = '\0'; 00158 if (!ast_strlen_zero(buf)) { 00159 c = strchr(buf, ':'); 00160 if (c) { 00161 *c = '\0'; 00162 c++; 00163 while ((*c) && (*c < 33)) 00164 c++; 00165 #if 0 00166 printf("'%s' is '%s' at line %d\n", buf, c, lineno); 00167 #endif 00168 if (!strcasecmp(buf, "channel")) { 00169 ast_copy_string(o->tech, c, sizeof(o->tech)); 00170 if ((c2 = strchr(o->tech, '/'))) { 00171 *c2 = '\0'; 00172 c2++; 00173 ast_copy_string(o->dest, c2, sizeof(o->dest)); 00174 } else { 00175 ast_log(LOG_NOTICE, "Channel should be in form Tech/Dest at line %d of %s\n", lineno, fn); 00176 o->tech[0] = '\0'; 00177 } 00178 } else if (!strcasecmp(buf, "callerid")) { 00179 ast_callerid_split(c, o->cid_name, sizeof(o->cid_name), o->cid_num, sizeof(o->cid_num)); 00180 } else if (!strcasecmp(buf, "application")) { 00181 ast_copy_string(o->app, c, sizeof(o->app)); 00182 } else if (!strcasecmp(buf, "data")) { 00183 ast_copy_string(o->data, c, sizeof(o->data)); 00184 } else if (!strcasecmp(buf, "maxretries")) { 00185 if (sscanf(c, "%d", &o->maxretries) != 1) { 00186 ast_log(LOG_WARNING, "Invalid max retries at line %d of %s\n", lineno, fn); 00187 o->maxretries = 0; 00188 } 00189 } else if (!strcasecmp(buf, "context")) { 00190 ast_copy_string(o->context, c, sizeof(o->context)); 00191 } else if (!strcasecmp(buf, "extension")) { 00192 ast_copy_string(o->exten, c, sizeof(o->exten)); 00193 } else if (!strcasecmp(buf, "priority")) { 00194 if ((sscanf(c, "%d", &o->priority) != 1) || (o->priority < 1)) { 00195 ast_log(LOG_WARNING, "Invalid priority at line %d of %s\n", lineno, fn); 00196 o->priority = 1; 00197 } 00198 } else if (!strcasecmp(buf, "retrytime")) { 00199 if ((sscanf(c, "%d", &o->retrytime) != 1) || (o->retrytime < 1)) { 00200 ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, fn); 00201 o->retrytime = 300; 00202 } 00203 } else if (!strcasecmp(buf, "waittime")) { 00204 if ((sscanf(c, "%d", &o->waittime) != 1) || (o->waittime < 1)) { 00205 ast_log(LOG_WARNING, "Invalid retrytime at line %d of %s\n", lineno, fn); 00206 o->waittime = 45; 00207 } 00208 } else if (!strcasecmp(buf, "retry")) { 00209 o->retries++; 00210 } else if (!strcasecmp(buf, "startretry")) { 00211 if (sscanf(c, "%ld", &o->callingpid) != 1) { 00212 ast_log(LOG_WARNING, "Unable to retrieve calling PID!\n"); 00213 o->callingpid = 0; 00214 } 00215 } else if (!strcasecmp(buf, "endretry") || !strcasecmp(buf, "abortretry")) { 00216 o->callingpid = 0; 00217 o->retries++; 00218 } else if (!strcasecmp(buf, "delayedretry")) { 00219 } else if (!strcasecmp(buf, "setvar") || !strcasecmp(buf, "set")) { 00220 c2 = c; 00221 strsep(&c2, "="); 00222 if (c2) { 00223 var = ast_variable_new(c, c2); 00224 if (var) { 00225 var->next = o->vars; 00226 o->vars = var; 00227 } 00228 } else 00229 ast_log(LOG_WARNING, "Malformed \"%s\" argument. Should be \"%s: variable=value\"\n", buf, buf); 00230 } else if (!strcasecmp(buf, "account")) { 00231 ast_copy_string(o->account, c, sizeof(o->account)); 00232 } else if (!strcasecmp(buf, "alwaysdelete")) { 00233 ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ALWAYS_DELETE); 00234 } else if (!strcasecmp(buf, "archive")) { 00235 ast_set2_flag(&o->options, ast_true(c), SPOOL_FLAG_ARCHIVE); 00236 } else { 00237 ast_log(LOG_WARNING, "Unknown keyword '%s' at line %d of %s\n", buf, lineno, fn); 00238 } 00239 } else 00240 ast_log(LOG_NOTICE, "Syntax error at line %d of %s\n", lineno, fn); 00241 } 00242 } 00243 ast_copy_string(o->fn, fn, sizeof(o->fn)); 00244 if (ast_strlen_zero(o->tech) || ast_strlen_zero(o->dest) || (ast_strlen_zero(o->app) && ast_strlen_zero(o->exten))) { 00245 ast_log(LOG_WARNING, "At least one of app or extension must be specified, along with tech and dest in file %s\n", fn); 00246 return -1; 00247 } 00248 return 0; 00249 }
AST_MODULE_INFO_STANDARD | ( | ASTERISK_GPL_KEY | , | |
"Outgoing Spool Support" | ||||
) |
static void* attempt_thread | ( | void * | data | ) | [static] |
Definition at line 327 of file pbx_spool.c.
References outgoing::account, outgoing::app, AST_FORMAT_SLINEAR, ast_log(), ast_pbx_outgoing_app(), ast_pbx_outgoing_exten(), ast_strlen_zero(), ast_verbose(), outgoing::cid_name, outgoing::cid_num, outgoing::context, outgoing::data, outgoing::dest, outgoing::exten, free_outgoing(), LOG_EVENT, LOG_NOTICE, outgoing::maxretries, option_verbose, outgoing::priority, remove_from_queue(), outgoing::retries, safe_append(), outgoing::tech, outgoing::vars, VERBOSE_PREFIX_3, and outgoing::waittime.
Referenced by launch_service().
00328 { 00329 struct outgoing *o = data; 00330 int res, reason; 00331 if (!ast_strlen_zero(o->app)) { 00332 if (option_verbose > 2) 00333 ast_verbose(VERBOSE_PREFIX_3 "Attempting call on %s/%s for application %s(%s) (Retry %d)\n", o->tech, o->dest, o->app, o->data, o->retries); 00334 res = ast_pbx_outgoing_app(o->tech, AST_FORMAT_SLINEAR, o->dest, o->waittime * 1000, o->app, o->data, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL); 00335 } else { 00336 if (option_verbose > 2) 00337 ast_verbose(VERBOSE_PREFIX_3 "Attempting call on %s/%s for %s@%s:%d (Retry %d)\n", o->tech, o->dest, o->exten, o->context,o->priority, o->retries); 00338 res = ast_pbx_outgoing_exten(o->tech, AST_FORMAT_SLINEAR, o->dest, o->waittime * 1000, o->context, o->exten, o->priority, &reason, 2 /* wait to finish */, o->cid_num, o->cid_name, o->vars, o->account, NULL); 00339 } 00340 if (res) { 00341 ast_log(LOG_NOTICE, "Call failed to go through, reason %d\n", reason); 00342 if (o->retries >= o->maxretries + 1) { 00343 /* Max retries exceeded */ 00344 ast_log(LOG_EVENT, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : ""); 00345 remove_from_queue(o, "Expired"); 00346 } else { 00347 /* Notate that the call is still active */ 00348 safe_append(o, time(NULL), "EndRetry"); 00349 } 00350 } else { 00351 ast_log(LOG_NOTICE, "Call completed to %s/%s\n", o->tech, o->dest); 00352 ast_log(LOG_EVENT, "Queued call to %s/%s completed\n", o->tech, o->dest); 00353 remove_from_queue(o, "Completed"); 00354 } 00355 free_outgoing(o); 00356 return NULL; 00357 }
static void free_outgoing | ( | struct outgoing * | o | ) | [static] |
Definition at line 121 of file pbx_spool.c.
References free.
Referenced by attempt_thread(), launch_service(), and scan_service().
00122 { 00123 free(o); 00124 }
static void init_outgoing | ( | struct outgoing * | o | ) | [static] |
Definition at line 112 of file pbx_spool.c.
References ast_set_flag, outgoing::options, outgoing::priority, outgoing::retrytime, SPOOL_FLAG_ALWAYS_DELETE, and outgoing::waittime.
Referenced by scan_service().
00113 { 00114 memset(o, 0, sizeof(struct outgoing)); 00115 o->priority = 1; 00116 o->retrytime = 300; 00117 o->waittime = 45; 00118 ast_set_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE); 00119 }
static void launch_service | ( | struct outgoing * | o | ) | [static] |
Definition at line 359 of file pbx_spool.c.
References ast_log(), ast_pthread_create, attempt_thread(), free_outgoing(), LOG_WARNING, and t.
Referenced by scan_service().
00360 { 00361 pthread_t t; 00362 pthread_attr_t attr; 00363 int ret; 00364 pthread_attr_init(&attr); 00365 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 00366 if ((ret = ast_pthread_create(&t,&attr,attempt_thread, o)) != 0) { 00367 ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret); 00368 free_outgoing(o); 00369 } 00370 pthread_attr_destroy(&attr); 00371 }
static int load_module | ( | void | ) | [static] |
Definition at line 486 of file pbx_spool.c.
References ast_config_AST_SPOOL_DIR, ast_log(), ast_pthread_create_background, LOG_WARNING, scan_thread(), and thread.
00487 { 00488 pthread_t thread; 00489 pthread_attr_t attr; 00490 int ret; 00491 snprintf(qdir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing"); 00492 if (mkdir(qdir, 0700) && (errno != EEXIST)) { 00493 ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool disabled\n", qdir); 00494 return 0; 00495 } 00496 snprintf(qdonedir, sizeof(qdir), "%s/%s", ast_config_AST_SPOOL_DIR, "outgoing_done"); 00497 pthread_attr_init(&attr); 00498 pthread_attr_setdetachstate(&attr, PTHREAD_CREATE_DETACHED); 00499 if ((ret = ast_pthread_create_background(&thread,&attr,scan_thread, NULL)) != 0) { 00500 ast_log(LOG_WARNING, "Unable to create thread :( (returned error: %d)\n", ret); 00501 return -1; 00502 } 00503 pthread_attr_destroy(&attr); 00504 return 0; 00505 }
static int remove_from_queue | ( | struct outgoing * | o, | |
const char * | status | |||
) | [static] |
Remove a call file from the outgoing queue optionally moving it in the archive dir.
o | the pointer to outgoing struct | |
status | the exit status of the call. Can be "Completed", "Failed" or "Expired" |
Definition at line 278 of file pbx_spool.c.
References ast_log(), ast_test_flag, f, outgoing::fn, LOG_WARNING, outgoing::options, SPOOL_FLAG_ALWAYS_DELETE, and SPOOL_FLAG_ARCHIVE.
00279 { 00280 int fd; 00281 FILE *f; 00282 char newfn[256]; 00283 const char *bname; 00284 00285 if (!ast_test_flag(&o->options, SPOOL_FLAG_ALWAYS_DELETE)) { 00286 struct stat current_file_status; 00287 00288 if (!stat(o->fn, ¤t_file_status)) 00289 if (time(NULL) < current_file_status.st_mtime) 00290 return 0; 00291 } 00292 00293 if (!ast_test_flag(&o->options, SPOOL_FLAG_ARCHIVE)) { 00294 unlink(o->fn); 00295 return 0; 00296 } 00297 if (mkdir(qdonedir, 0700) && (errno != EEXIST)) { 00298 ast_log(LOG_WARNING, "Unable to create queue directory %s -- outgoing spool archiving disabled\n", qdonedir); 00299 unlink(o->fn); 00300 return -1; 00301 } 00302 fd = open(o->fn, O_WRONLY|O_APPEND); 00303 if (fd > -1) { 00304 f = fdopen(fd, "a"); 00305 if (f) { 00306 fprintf(f, "Status: %s\n", status); 00307 fclose(f); 00308 } else 00309 close(fd); 00310 } 00311 00312 bname = strrchr(o->fn,'/'); 00313 if (bname == NULL) 00314 bname = o->fn; 00315 else 00316 bname++; 00317 snprintf(newfn, sizeof(newfn), "%s/%s", qdonedir, bname); 00318 /* a existing call file the archive dir is overwritten */ 00319 unlink(newfn); 00320 if (rename(o->fn, newfn) != 0) { 00321 unlink(o->fn); 00322 return -1; 00323 } else 00324 return 0; 00325 }
static void safe_append | ( | struct outgoing * | o, | |
time_t | now, | |||
char * | s | |||
) | [static] |
Definition at line 251 of file pbx_spool.c.
References ast_log(), ast_mainpid, f, outgoing::fn, LOG_WARNING, outgoing::retries, and outgoing::retrytime.
Referenced by attempt_thread(), and scan_service().
00252 { 00253 int fd; 00254 FILE *f; 00255 struct utimbuf tbuf; 00256 fd = open(o->fn, O_WRONLY|O_APPEND); 00257 if (fd > -1) { 00258 f = fdopen(fd, "a"); 00259 if (f) { 00260 fprintf(f, "%s: %ld %d (%ld)\n", s, (long)ast_mainpid, o->retries, (long) now); 00261 fclose(f); 00262 } else 00263 close(fd); 00264 /* Update the file time */ 00265 tbuf.actime = now; 00266 tbuf.modtime = now + o->retrytime; 00267 if (utime(o->fn, &tbuf)) 00268 ast_log(LOG_WARNING, "Unable to set utime on %s: %s\n", o->fn, strerror(errno)); 00269 } 00270 }
static int scan_service | ( | char * | fn, | |
time_t | now, | |||
time_t | atime | |||
) | [static] |
Definition at line 373 of file pbx_spool.c.
References apply_outgoing(), ast_log(), ast_mainpid, f, free_outgoing(), init_outgoing(), launch_service(), LOG_DEBUG, LOG_EVENT, LOG_WARNING, malloc, remove_from_queue(), and safe_append().
Referenced by scan_thread().
00374 { 00375 struct outgoing *o; 00376 FILE *f; 00377 o = malloc(sizeof(struct outgoing)); 00378 if (o) { 00379 init_outgoing(o); 00380 f = fopen(fn, "r+"); 00381 if (f) { 00382 if (!apply_outgoing(o, fn, f)) { 00383 #if 0 00384 printf("Filename: %s, Retries: %d, max: %d\n", fn, o->retries, o->maxretries); 00385 #endif 00386 fclose(f); 00387 if (o->retries <= o->maxretries) { 00388 now += o->retrytime; 00389 if (o->callingpid && (o->callingpid == ast_mainpid)) { 00390 safe_append(o, time(NULL), "DelayedRetry"); 00391 ast_log(LOG_DEBUG, "Delaying retry since we're currently running '%s'\n", o->fn); 00392 free_outgoing(o); 00393 } else { 00394 /* Increment retries */ 00395 o->retries++; 00396 /* If someone else was calling, they're presumably gone now 00397 so abort their retry and continue as we were... */ 00398 if (o->callingpid) 00399 safe_append(o, time(NULL), "AbortRetry"); 00400 00401 safe_append(o, now, "StartRetry"); 00402 launch_service(o); 00403 } 00404 return now; 00405 } else { 00406 ast_log(LOG_EVENT, "Queued call to %s/%s expired without completion after %d attempt%s\n", o->tech, o->dest, o->retries - 1, ((o->retries - 1) != 1) ? "s" : ""); 00407 free_outgoing(o); 00408 remove_from_queue(o, "Expired"); 00409 return 0; 00410 } 00411 } else { 00412 free_outgoing(o); 00413 ast_log(LOG_WARNING, "Invalid file contents in %s, deleting\n", fn); 00414 fclose(f); 00415 remove_from_queue(o, "Failed"); 00416 } 00417 } else { 00418 free_outgoing(o); 00419 ast_log(LOG_WARNING, "Unable to open %s: %s, deleting\n", fn, strerror(errno)); 00420 remove_from_queue(o, "Failed"); 00421 } 00422 } else 00423 ast_log(LOG_WARNING, "Out of memory :(\n"); 00424 return -1; 00425 }
static void* scan_thread | ( | void * | unused | ) | [static] |
Definition at line 427 of file pbx_spool.c.
References ast_log(), last, LOG_WARNING, and scan_service().
Referenced by load_module().
00428 { 00429 struct stat st; 00430 DIR *dir; 00431 struct dirent *de; 00432 char fn[256]; 00433 int res; 00434 time_t last = 0, next = 0, now; 00435 for(;;) { 00436 /* Wait a sec */ 00437 sleep(1); 00438 time(&now); 00439 if (!stat(qdir, &st)) { 00440 if ((st.st_mtime != last) || (next && (now > next))) { 00441 #if 0 00442 printf("atime: %ld, mtime: %ld, ctime: %ld\n", st.st_atime, st.st_mtime, st.st_ctime); 00443 printf("Ooh, something changed / timeout\n"); 00444 #endif 00445 next = 0; 00446 last = st.st_mtime; 00447 dir = opendir(qdir); 00448 if (dir) { 00449 while((de = readdir(dir))) { 00450 snprintf(fn, sizeof(fn), "%s/%s", qdir, de->d_name); 00451 if (!stat(fn, &st)) { 00452 if (S_ISREG(st.st_mode)) { 00453 if (st.st_mtime <= now) { 00454 res = scan_service(fn, now, st.st_atime); 00455 if (res > 0) { 00456 /* Update next service time */ 00457 if (!next || (res < next)) { 00458 next = res; 00459 } 00460 } else if (res) 00461 ast_log(LOG_WARNING, "Failed to scan service '%s'\n", fn); 00462 } else { 00463 /* Update "next" update if necessary */ 00464 if (!next || (st.st_mtime < next)) 00465 next = st.st_mtime; 00466 } 00467 } 00468 } else 00469 ast_log(LOG_WARNING, "Unable to stat %s: %s\n", fn, strerror(errno)); 00470 } 00471 closedir(dir); 00472 } else 00473 ast_log(LOG_WARNING, "Unable to open directory %s: %s\n", qdir, strerror(errno)); 00474 } 00475 } else 00476 ast_log(LOG_WARNING, "Unable to stat %s\n", qdir); 00477 } 00478 return NULL; 00479 }
static int unload_module | ( | void | ) | [static] |
char qdir[255] [static] |
Definition at line 66 of file pbx_spool.c.
char qdonedir[255] [static] |
Definition at line 67 of file pbx_spool.c.