45 #include <sys/types.h> 58 #define TEST_NULV(a) if(!a) { \ 59 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; } 68 #define TEST_NULL(a, ret) if(!a) { \ 69 errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; } 72 #define AGENT_CREDENTIAL \ 73 log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, agent->type->name, \ 74 agent->pid, agent->host->name) 77 #define AGENT_LOG_CREDENTIAL \ 78 con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \ 79 agent->owner->id, agent->type->name, agent->pid, agent->host->name) 82 #define AGENT_ERROR(...) do { \ 83 log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \ 85 log_printf(__VA_ARGS__); \ 86 log_printf("\n"); } while(0) 89 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \ 90 log_printf("NOTE: "); \ 92 log_printf(__VA_ARGS__); \ 93 log_printf("\n"); } while(0) 96 #define AGENT_WARNING(...) if(TEST_WARNING) do { \ 97 log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \ 99 log_printf(__VA_ARGS__); \ 100 log_printf("\n"); } while(0) 103 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \ 105 log_printf(__VA_ARGS__); } while(0) 108 #define AGENT_CONCURRENT_PRINT(...) do { \ 109 AGENT_LOG_CREDENTIAL; \ 110 con_printf(job_log(agent->owner), __VA_ARGS__); } while(0) 120 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed), 121 const char* agent_status_strings[] =
143 if (agent != excepted)
148 fclose(agent->
write);
168 if (agent->
status == AG_SPAWNED || agent->
status == AG_RUNNING || agent->
status == AG_PAUSED)
171 if (time(NULL) - agent->
check_in > CONF_agent_death_timer && !(agent->
owner->
status == JB_PAUSED) && !nokill)
187 if (agent->
n_updates > CONF_agent_update_number && !nokill)
232 g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
233 g_output_stream_write(ostr,
" ", 1, NULL, NULL);
251 static int32_t id_gen = -1;
255 char *jq_cmd_args = 0;
257 for (iter = scheduler->
host_queue; iter != NULL; iter = iter->next)
259 host = (
host_t*) iter->data;
260 V_AGENT(
"META_AGENT[%s] testing on HOST[%s]\n", ma->
name, host->
name);
278 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32 279 static GMutex version_lock;
281 static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
302 if (fgets(buffer,
sizeof(buffer), agent->
read) == NULL)
309 buffer[strlen(buffer) - 1] =
'\0';
310 if (strncmp(buffer,
"VERSION: ", 9) != 0)
312 if (strncmp(buffer,
"@@@1", 4) == 0)
321 con_printf(
main_log,
"ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
329 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32 330 g_mutex_lock(&version_lock);
332 g_static_mutex_lock(&version_lock);
334 strcpy(buffer, &buffer[9]);
343 else if (strcmp(agent->
type->
version, buffer) != 0)
345 con_printf(
job_log(agent->
owner),
"ERROR %s.%d: META_DATA[%s] invalid agent spawn check\n", __FILE__, __LINE__,
347 con_printf(
job_log(agent->
owner),
"ERROR: versions don't match: %s(%s) != received: %s(%s)\n",
351 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32 352 g_mutex_unlock(&version_lock);
354 g_static_mutex_unlock(&version_lock);
358 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32 359 g_mutex_unlock(&version_lock);
361 g_static_mutex_unlock(&version_lock);
376 if (fgets(buffer,
sizeof(buffer), agent->
read) == NULL)
379 buffer[strlen(buffer) - 1] =
'\0';
381 if (strlen(buffer) == 0)
384 if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(buffer,
"SPECIAL", 7) != 0))
394 if (strncmp(buffer,
"BYE", 3) == 0)
396 if ((agent->
return_code = atoi(&(buffer[4]))) != 0)
410 if (strncmp(buffer,
"@@@1", 4) == 0)
419 if (strncmp(buffer,
"@@@0", 4) == 0 && agent->
updated)
423 fflush(agent->
write);
439 if (strncmp(buffer,
"OK", 2) == 0)
441 if (agent->
status != AG_PAUSED)
452 else if (strncmp(buffer,
"HEART", 5) == 0)
456 arg = g_match_info_fetch(match, 3);
460 arg = g_match_info_fetch(match, 6);
461 agent->
alive = (arg[0] ==
'1' || agent->
alive);
464 g_match_info_free(match);
476 else if (strncmp(buffer,
"EMAIL", 5) == 0)
488 else if (strncmp(buffer,
"SPECIAL", 7) == 0)
494 arg = g_match_info_fetch(match, 3);
495 relevant &= atoi(arg);
498 arg = g_match_info_fetch(match, 6);
506 if (!(agent->
special & relevant))
511 g_match_info_free(match);
521 else if (strncmp(buffer,
"GETSPECIAL", 10) == 0)
525 arg = g_match_info_fetch(match, 3);
526 relevant = atoi(arg);
534 g_match_info_free(match);
542 else if (!(TVERB_AGENT))
562 static void shell_parse(
char* confdir,
int user_id,
int group_id,
char* input,
char *jq_cmd_args,
int jobId,
int* argc,
char*** argv)
567 #define MAX_CMD_ARGS 30 569 *argv = g_new0(
char*, MAX_CMD_ARGS);
572 for (curr = input; *curr; curr++)
583 (*argv)[idx++] = g_strdup(begin);
586 else if (begin == NULL)
590 else if (*begin ==
'"' && *curr ==
'"')
595 (*argv)[idx++] = g_strdup(begin + 1);
598 if (idx > MAX_CMD_ARGS - 7)
602 (*argv)[idx++] = g_strdup_printf(
"--jobId=%d", jobId);
603 (*argv)[idx++] = g_strdup_printf(
"--config=%s", confdir);
604 (*argv)[idx++] = g_strdup_printf(
"--userID=%d", user_id);
605 (*argv)[idx++] = g_strdup_printf(
"--groupID=%d", group_id);
606 (*argv)[idx++] =
"--scheduler_start";
608 (*argv)[idx++] = jq_cmd_args;
652 while ((agent->
pid = fork()) < 0)
653 sleep(rand() % CONF_fork_backoff_time);
670 ERROR(
"unable to correctly set priority of agent process %d", agent->
pid);
685 strcpy(buffer, args[0]);
686 *strrchr(buffer,
'/') =
'\0';
687 if (chdir(buffer) != 0)
689 ERROR(
"unable to change working directory: %s\n", strerror(errno));
692 execv(args[0], args);
700 args = g_new0(
char*, 5);
701 len = snprintf(buffer,
sizeof(buffer),
AGENT_BINARY " --userID=%d --groupID=%d --scheduler_start --jobId=%d",
705 if (len>=
sizeof(buffer)) {
706 *(buffer +
sizeof(
buffer) - 1) =
'\0';
707 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
713 args[0] =
"/usr/bin/ssh";
718 execv(args[0], args);
722 log_printf(
"ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->
owner->
id,
763 ERROR(
"invalid arguments passed to meta_agent_init()");
770 log_printf(
"ERROR failed to load %s meta agent", name);
777 strcpy(ma->
name, name);
779 strcat(ma->
raw_cmd,
" --scheduler_start");
817 int child_to_parent[2];
818 int parent_to_child[2];
824 log_printf(
"ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
825 log_printf(
"ERROR: no other information available\n");
832 log_printf(
"ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
843 agent->
status = AG_CREATED;
846 if (agent->
type == NULL)
855 ERROR(
"agent %s has been invalidated by version information", job->
agent_type);
860 if (pipe(parent_to_child) != 0)
862 ERROR(
"JOB[%d.%s] failed to create parent to child pipe", job->
id, job->
agent_type);
866 if (pipe(child_to_parent) != 0)
868 ERROR(
"JOB[%d.%s] failed to create child to parent pipe", job->
id, job->
agent_type);
875 agent->
to_child = parent_to_child[1];
915 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32 918 agent->
thread = g_thread_create((GThreadFunc)agent_spawn, pass, 1, NULL);
944 fclose(agent->
write);
968 if ((agent = g_tree_lookup(scheduler->
agents, &pid[0])) == NULL)
970 ERROR(
"invalid agent death event: pid[%d]", pid[0]);
977 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
979 g_thread_join(agent->
thread);
983 if (WIFEXITED(status))
987 else if (WIFSIGNALED(status))
989 AGENT_CONCURRENT_PRINT(
"agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
990 if (WCOREDUMP(status))
997 AGENT_WARNING(
"agent closed unexpectedly, agent status was %s", agent_status_strings[agent->
status]);
1001 if (agent->
status != AG_PAUSED && agent->
status != AG_FAILED)
1007 log_printf(
"ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->
host->
name,
1017 g_tree_remove(scheduler->
agents, &agent->
pid);
1037 g_tree_insert(scheduler->
agents, &agent->
pid, agent);
1058 if (agent->
status == AG_SPAWNED)
1082 if (write(agent->
to_parent,
"@@@0\n", 5) != 5)
1100 g_tree_foreach(scheduler->
agents, (GTraverseFunc)
update, NULL);
1118 if (write(agent->
to_parent,
"@@@1\n", 5) != 5)
1119 AGENT_ERROR(
"Failed to kill agent thread cleanly");
1132 g_output_stream_write(ostr,
"\nend\n", 5, NULL, NULL);
1150 agent_status_strings[new_status]);
1154 if (agent->
status == AG_PAUSED)
1159 if (new_status == AG_PAUSED)
1166 agent->
status = new_status;
1177 kill(agent->
pid, SIGSTOP);
1190 kill(agent->
pid, SIGCONT);
1208 struct tm* time_info;
1213 strcpy(time_buf,
"(none)");
1214 time_info = localtime(&agent->
check_in);
1216 strftime(time_buf,
sizeof(time_buf),
"%F %T", localtime(&agent->
check_in));
1217 status_str = g_strdup_printf(
"agent:%d host:%s type:%s status:%s time:%s\n", agent->
pid, agent->
host->
name,
1218 agent->
type->
name, agent_status_strings[agent->
status], time_buf);
1221 g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1238 kill(agent->
pid, SIGKILL);
1255 va_start(args, fmt);
1258 tmp = g_strdup_vprintf(fmt, args);
1259 tmp[strlen(tmp) - 1] =
'\0';
1261 rc = fprintf(agent->
write,
"%s\n", tmp);
1266 rc = vfprintf(agent->
write, fmt, args);
1269 fflush(agent->
write);
1287 return write(agent->
to_parent, buf, count);
1334 if (g_tree_lookup(meta_agents, name) == NULL)
1338 g_tree_insert(meta_agents, ma->
name, ma);
1354 return (ma != NULL) && ((ma->
special & special_type) != 0);
1366 return (agent != NULL) && ((agent->
special & special_type) != 0);
1376 V_AGENT(
"AGENT[%s] run increased to %d\n", ma->
name, ma->
run_count);
1386 V_AGENT(
"AGENT[%s] run decreased to %d\n", ma->
name, ma->
run_count);
void meta_agent_increase_count(meta_agent_t *ma)
gboolean alive
flag to tell the scheduler if the agent is still alive
uint8_t n_updates
keeps track of the number of times the agent has updated
#define AGENT_WARNING(...)
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
GTree * meta_agents
List of all meta agents available to the scheduler.
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
int32_t user_id
The id of the user that created the job.
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
void job_fail_agent(job_t *job, void *agent)
Store the results of a regex match.
scheduler_t * scheduler
Reference to current scheduler state.
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
agent_t * agent
Reference to current agent state.
uint64_t total_analyzed
the total number that this agent has analyzed
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces...
#define AGENT_STATUS_TYPES(apply)
GList * host_queue
Round-robin queue for choosing which host use next.
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
gchar * jq_cmd_args
Command line arguments for this job.
#define AGENT_CONF
Agent conf location.
host_t * host
the host that this agent will start on
#define SAG_NOKILL
This agent should not be killed when updating the agent.
GThread * thread
the thread that communicates with this agent
gboolean updated
boolean flag to indicate if the scheduler has updated the data
int32_t priority
Importance of the job, maps directory to unix priority.
gchar * sysconfigdir
The system directory that contain fossology.conf.
int jobId
The id of the job.
GSequence * job_queue
heap of jobs that still need to be started
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
void agent_transition(agent_t *agent, agent_status new_status)
void meta_agent_decrease_count(meta_agent_t *ma)
char * address
The address of the host, used by ssh when starting a new agent.
char * name
The name of the host, used to store host internally to scheduler.
#define AGENT_CONCURRENT_PRINT(...)
void meta_agent_destroy(meta_agent_t *ma)
int to_parent
file identifier to print to the parent (child stdout)
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
#define AGENT_SEQUENTIAL_PRINT(...)
log_t * job_log(job_t *job)
time_t check_in
the time that the agent last generated anything
char * job_next(job_t *job)
GTree * job_list
List of jobs that have been created.
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
#define SELECT_STRING(passed)
int from_parent
file identifier to read from the parent (child stdin)
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
uint8_t return_code
what was returned by the agent when it disconnected
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
#define AGENT_BINARY
Format to get agent binary.
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
FILE * write
FILE* that abstracts the use of the to_child socket.
agent_status status
the state of execution the agent is currently in
ssize_t agent_write(agent_t *agent, const void *buf, int count)
int aprintf(agent_t *agent, const char *fmt,...)
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
void job_update(scheduler_t *scheduler, job_t *job)
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
#define TEST_NULV(a)
Test if paramater is NULL.
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
#define MAX_CMD
the size of the agent's command buffer (arbitrary)
void agent_unpause(agent_t *agent)
void agent_pause(agent_t *agent)
GTree * agents
List of any currently running agents.
char * agent_type
The type of agent used to analyze the data.
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
void job_finish_agent(job_t *job, void *agent)
int32_t parent_id
The identifier for the parent of this job (its queue id)
int from_child
file identifier to read from child
char buffer[2048]
The last thing received from the scheduler.
#define TEST_NULL(a, ret)
Test if paramater is NULL.
void agent_update_event(scheduler_t *scheduler, void *unused)
pid_t pid
the pid of the process this agent is running in
int32_t group_id
The id of the group that created the job.
Event handling operations.
void agent_kill(agent_t *agent)
Unclean kill of an agent.
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
Header file with agent related operations.
gchar * message
Message that will be sent with job notification email.
int to_child
file identifier to print to the child
GRegex * parse_agent_msg
Parses messages coming from the agents.
FILE * read
FILE* that abstracts the use of the from_child socket.
#define THREAD_FATAL(file,...)
uint32_t special
any special flags that the agent has set
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
char * agent_dir
The location on the host machine where the executables are.
static int agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent's pipes.
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
gchar * data
the data that has been sent to the agent for analysis
job_status status
The current status for the job.
Header file for the scheduler.
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
#define MAX_NAME
the size of the agent's name buffer (arbitrary)
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
job_t * owner
the job that this agent is assigned to
int32_t id
The identifier for this job.