FOSSology  3.2.0rc1
Open Source License Compliance by Open Source Software
agent.c
Go to the documentation of this file.
1 /* **************************************************************
2  Copyright (C) 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3  Copyright (C) 2015 Siemens AG
4 
5  This program is free software; you can redistribute it and/or
6  modify it under the terms of the GNU General Public License
7  version 2 as published by the Free Software Foundation.
8 
9  This program is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License along
15  with this program; if not, write to the Free Software Foundation, Inc.,
16  51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17  ************************************************************** */
18 
26 /* local includes */
27 #include <agent.h>
28 #include <database.h>
29 #include <event.h>
30 #include <host.h>
31 #include <job.h>
32 #include <logging.h>
33 #include <scheduler.h>
34 
35 /* library includes */
36 #include <limits.h>
37 #include <stdio.h>
38 #include <stdlib.h>
39 #include <string.h>
40 #include <time.h>
41 
42 /* unix library includes */
43 #include <fcntl.h>
44 #include <limits.h>
45 #include <sys/types.h>
46 #include <sys/wait.h>
47 #include <unistd.h>
48 
49 /* other library includes */
50 #include <glib.h>
51 
58 #define TEST_NULV(a) if(!a) { \
59  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return; }
60 
68 #define TEST_NULL(a, ret) if(!a) { \
69  errno = EINVAL; ERROR("agent passed is NULL, cannot proceed"); return ret; }
70 
72 #define AGENT_CREDENTIAL \
73  log_printf("JOB[%d].%s[%d.%s]: ", agent->owner->id, agent->type->name, \
74  agent->pid, agent->host->name)
75 
77 #define AGENT_LOG_CREDENTIAL \
78  con_printf(job_log(agent->owner), "JOB[%d].%s[%d.%s]: ", \
79  agent->owner->id, agent->type->name, agent->pid, agent->host->name)
80 
82 #define AGENT_ERROR(...) do { \
83  log_printf("ERROR: %s.%d: ", __FILE__, __LINE__); \
84  AGENT_CREDENTIAL; \
85  log_printf(__VA_ARGS__); \
86  log_printf("\n"); } while(0)
87 
89 #define AGENT_NOTIFY(...) if(TEST_NOTIFY) do { \
90  log_printf("NOTE: "); \
91  AGENT_CREDENTIAL; \
92  log_printf(__VA_ARGS__); \
93  log_printf("\n"); } while(0)
94 
96 #define AGENT_WARNING(...) if(TEST_WARNING) do { \
97  log_printf("WARNING %s.%d: ", __FILE__, __LINE__); \
98  AGENT_CREDENTIAL; \
99  log_printf(__VA_ARGS__); \
100  log_printf("\n"); } while(0)
101 
103 #define AGENT_SEQUENTIAL_PRINT(...) if(TVERB_AGENT) do { \
104  AGENT_CREDENTIAL; \
105  log_printf(__VA_ARGS__); } while(0)
106 
108 #define AGENT_CONCURRENT_PRINT(...) do { \
109  AGENT_LOG_CREDENTIAL; \
110  con_printf(job_log(agent->owner), __VA_ARGS__); } while(0)
111 
112 /* ************************************************************************** */
113 /* **** Data Types ********************************************************** */
114 /* ************************************************************************** */
115 
120 #define SELECT_STRING(passed) MK_STRING_LIT(AGENT_##passed),
121 const char* agent_status_strings[] =
123 #undef SELECT_STRING
124 
125 /* ************************************************************************** */
126 /* **** Local Functions ***************************************************** */
127 /* ************************************************************************** */
128 
140 static int agent_close_fd(int* pid_ptr, agent_t* agent, agent_t* excepted)
141 {
142  TEST_NULL(agent, 0);
143  if (agent != excepted)
144  {
145  close(agent->from_child);
146  close(agent->to_child);
147  fclose(agent->read);
148  fclose(agent->write);
149  }
150  return 0;
151 }
152 
163 static int update(int* pid_ptr, agent_t* agent, gpointer unused)
164 {
165  TEST_NULL(agent, 0);
166  int nokill = is_agent_special(agent, SAG_NOKILL) || is_meta_special(agent->type, SAG_NOKILL);
167 
168  if (agent->status == AG_SPAWNED || agent->status == AG_RUNNING || agent->status == AG_PAUSED)
169  {
170  /* check last checkin time */
171  if (time(NULL) - agent->check_in > CONF_agent_death_timer && !(agent->owner->status == JB_PAUSED) && !nokill)
172  {
173  AGENT_CONCURRENT_PRINT("no heartbeat for %d seconds\n", (time(NULL) - agent->check_in));
174  agent_kill(agent);
175  return 0;
176  }
177 
178  /* check items processed */
179  if (agent->status != AG_PAUSED && !agent->alive)
180  {
181  agent->n_updates++;
182  }
183  else
184  {
185  agent->n_updates = 0;
186  }
187  if (agent->n_updates > CONF_agent_update_number && !nokill)
188  {
189  AGENT_CONCURRENT_PRINT("agent has not set the alive flag in at least 10 minutes, killing\n");
190  agent_kill(agent);
191  return 0;
192  }
193 
194  AGENT_SEQUENTIAL_PRINT("agent updated correctly, processed %d items: %d\n", agent->total_analyzed,
195  agent->n_updates);
196  agent->alive = 0;
197  }
198 
199  return 0;
200 }
201 
213 static int agent_kill_traverse(int* pid, agent_t* agent, gpointer unused)
214 {
215  agent_kill(agent);
216  return FALSE;
217 }
218 
228 static int agent_list(char* name, meta_agent_t* ma, GOutputStream* ostr)
229 {
230  if (ma->valid)
231  {
232  g_output_stream_write(ostr, name, strlen(name), NULL, NULL);
233  g_output_stream_write(ostr, " ", 1, NULL, NULL);
234  }
235  return FALSE;
236 }
237 
249 static int agent_test(const gchar* name, meta_agent_t* ma, scheduler_t* scheduler)
250 {
251  static int32_t id_gen = -1;
252 
253  GList* iter;
254  host_t* host;
255  char *jq_cmd_args = 0;
256 
257  for (iter = scheduler->host_queue; iter != NULL; iter = iter->next)
258  {
259  host = (host_t*) iter->data;
260  V_AGENT("META_AGENT[%s] testing on HOST[%s]\n", ma->name, host->name);
261  job_t* job = job_init(scheduler->job_list, scheduler->job_queue, ma->name, host->name, id_gen--, 0, 0, 0, 0, jq_cmd_args);
262  agent_init(scheduler, host, job);
263  }
264 
265  return 0;
266 }
267 
275 static void agent_listen(scheduler_t* scheduler, agent_t* agent)
276 {
277  /* static locals */
278 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
279  static GMutex version_lock;
280 #else
281  static GStaticMutex version_lock = G_STATIC_MUTEX_INIT;
282 #endif
283 
284  /* locals */
285  char buffer[1024]; // buffer to store c strings read from agent
286  GMatchInfo* match; // regex match information
287  char* arg; // used during regex retrievals
288  int relevant; // used during special retrievals
289 
290  TEST_NULV(agent);
291 
302  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
303  {
304  AGENT_CONCURRENT_PRINT("pipe from child closed: %s\n", strerror(errno));
305  g_thread_exit(NULL);
306  }
307 
308  /* check to make sure "VERSION" was sent */
309  buffer[strlen(buffer) - 1] = '\0';
310  if (strncmp(buffer, "VERSION: ", 9) != 0)
311  {
312  if (strncmp(buffer, "@@@1", 4) == 0)
313  {
314  THREAD_FATAL(job_log(agent->owner), "agent crashed before sending version information");
315  }
316  else
317  {
318  agent->type->valid = 0;
319  agent_fail_event(scheduler, agent);
320  agent_kill(agent);
321  con_printf(main_log, "ERROR %s.%d: agent %s.%s has been invalidated, removing from agents\n", __FILE__, __LINE__,
322  agent->host->name, agent->type->name);
323  AGENT_CONCURRENT_PRINT("agent didn't send version information: \"%s\"\n", buffer);
324  return;
325  }
326  }
327 
328  /* check that the VERSION information is correct */
329 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
330  g_mutex_lock(&version_lock);
331 #else
332  g_static_mutex_lock(&version_lock);
333 #endif
334  strcpy(buffer, &buffer[9]);
335  if (agent->type->version == NULL && agent->type->valid)
336  {
337  agent->type->version_source = agent->host->name;
338  agent->type->version = g_strdup(buffer);
339  if (TVERB_AGENT)
340  con_printf(main_log, "META_AGENT[%s.%s] version is: \"%s\"\n", agent->host->name, agent->type->name,
341  agent->type->version);
342  }
343  else if (strcmp(agent->type->version, buffer) != 0)
344  {
345  con_printf(job_log(agent->owner), "ERROR %s.%d: META_DATA[%s] invalid agent spawn check\n", __FILE__, __LINE__,
346  agent->type->name);
347  con_printf(job_log(agent->owner), "ERROR: versions don't match: %s(%s) != received: %s(%s)\n",
348  agent->type->version_source, agent->type->version, agent->host->name, buffer);
349  agent->type->valid = 0;
350  agent_kill(agent);
351 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
352  g_mutex_unlock(&version_lock);
353 #else
354  g_static_mutex_unlock(&version_lock);
355 #endif
356  return;
357  }
358 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
359  g_mutex_unlock(&version_lock);
360 #else
361  g_static_mutex_unlock(&version_lock);
362 #endif
363 
373  while (1)
374  {
375  /* get message from agent */
376  if (fgets(buffer, sizeof(buffer), agent->read) == NULL)
377  g_thread_exit(NULL);
378 
379  buffer[strlen(buffer) - 1] = '\0';
380 
381  if (strlen(buffer) == 0)
382  continue;
383 
384  if (TVERB_AGENT && (TVERB_SPECIAL || strncmp(buffer, "SPECIAL", 7) != 0))
385  AGENT_CONCURRENT_PRINT("received: \"%s\"\n", buffer);
386 
394  if (strncmp(buffer, "BYE", 3) == 0)
395  {
396  if ((agent->return_code = atoi(&(buffer[4]))) != 0)
397  {
398  AGENT_CONCURRENT_PRINT("agent failed with error code %d\n", agent->return_code);
399  event_signal(agent_fail_event, agent);
400  }
401  break;
402  }
403 
410  if (strncmp(buffer, "@@@1", 4) == 0)
411  break;
412 
419  if (strncmp(buffer, "@@@0", 4) == 0 && agent->updated)
420  {
421  aprintf(agent, "%s\n", agent->data);
422  aprintf(agent, "END\n");
423  fflush(agent->write);
424  agent->updated = 0;
425  continue;
426  }
427 
428  /* agent just checked in */
429  agent->check_in = time(NULL);
430 
439  if (strncmp(buffer, "OK", 2) == 0)
440  {
441  if (agent->status != AG_PAUSED)
442  event_signal(agent_ready_event, agent);
443  }
444 
452  else if (strncmp(buffer, "HEART", 5) == 0)
453  {
454  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
455 
456  arg = g_match_info_fetch(match, 3);
457  agent->total_analyzed = atoi(arg);
458  g_free(arg);
459 
460  arg = g_match_info_fetch(match, 6);
461  agent->alive = (arg[0] == '1' || agent->alive);
462  g_free(arg);
463 
464  g_match_info_free(match);
465  match = NULL;
466 
468  }
469 
476  else if (strncmp(buffer, "EMAIL", 5) == 0)
477  {
478  agent->owner->message = g_strdup(buffer + 6);
479  }
480 
488  else if (strncmp(buffer, "SPECIAL", 7) == 0)
489  {
490  relevant = INT_MAX;
491 
492  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
493 
494  arg = g_match_info_fetch(match, 3);
495  relevant &= atoi(arg);
496  g_free(arg);
497 
498  arg = g_match_info_fetch(match, 6);
499  if (atoi(arg))
500  {
501  if (agent->special & relevant)
502  relevant = 0;
503  }
504  else
505  {
506  if (!(agent->special & relevant))
507  relevant = 0;
508  }
509  g_free(arg);
510 
511  g_match_info_free(match);
512 
513  agent->special ^= relevant;
514  }
515 
521  else if (strncmp(buffer, "GETSPECIAL", 10) == 0)
522  {
523  g_regex_match(scheduler->parse_agent_msg, buffer, 0, &match);
524 
525  arg = g_match_info_fetch(match, 3);
526  relevant = atoi(arg);
527  g_free(arg);
528 
529  if (agent->special & relevant)
530  aprintf(agent, "VALUE: 1\n");
531  else
532  aprintf(agent, "VALUE: 0\n");
533 
534  g_match_info_free(match);
535  }
536 
542  else if (!(TVERB_AGENT))
543  AGENT_CONCURRENT_PRINT("\"%s\"\n", buffer);
544  }
545 
546  if (TVERB_AGENT)
547  AGENT_CONCURRENT_PRINT("communication thread closing\n");
548 }
549 
562 static void shell_parse(char* confdir, int user_id, int group_id, char* input, char *jq_cmd_args, int jobId, int* argc, char*** argv)
563 {
564  char* begin;
565  char* curr;
566  int idx = 0;
567 #define MAX_CMD_ARGS 30
568 
569  *argv = g_new0(char*, MAX_CMD_ARGS);
570  begin = NULL;
571 
572  for (curr = input; *curr; curr++)
573  {
574  if (*curr == ' ')
575  {
576  if (begin == NULL)
577  continue;
578 
579  if (*begin == '"')
580  continue;
581 
582  *curr = '\0';
583  (*argv)[idx++] = g_strdup(begin);
584  begin = NULL;
585  }
586  else if (begin == NULL)
587  {
588  begin = curr;
589  }
590  else if (*begin == '"' && *curr == '"')
591  {
592  *begin = '\0';
593  *curr = '\0';
594 
595  (*argv)[idx++] = g_strdup(begin + 1);
596  begin = NULL;
597  }
598  if (idx > MAX_CMD_ARGS - 7)
599  break;
600  }
601 
602  (*argv)[idx++] = g_strdup_printf("--jobId=%d", jobId);
603  (*argv)[idx++] = g_strdup_printf("--config=%s", confdir);
604  (*argv)[idx++] = g_strdup_printf("--userID=%d", user_id);
605  (*argv)[idx++] = g_strdup_printf("--groupID=%d", group_id);
606  (*argv)[idx++] = "--scheduler_start";
607  if (jq_cmd_args)
608  (*argv)[idx++] = jq_cmd_args;
609  (*argc) = idx;
610 }
611 
615 typedef struct
616 {
620 
640 static void* agent_spawn(agent_spawn_args* pass)
641 {
642  /* locals */
643  scheduler_t* scheduler = pass->scheduler;
644  agent_t* agent = pass->agent;
645  gchar* tmp; // pointer to temporary string
646  gchar** args; // the arguments that will be passed to the child
647  int argc; // the number of arguments parsed
648  int len;
649  char buffer[2048]; // character buffer
650 
651  /* spawn the new process */
652  while ((agent->pid = fork()) < 0)
653  sleep(rand() % CONF_fork_backoff_time);
654 
655  /* we are in the child */
656  if (agent->pid == 0)
657  {
658  /* set the child's stdin and stdout to use the pipes */
659  dup2(agent->from_parent, fileno(stdin));
660  dup2(agent->to_parent, fileno(stdout));
661  dup2(agent->to_parent, fileno(stderr));
662 
663  /* close all the unnecessary file descriptors */
664  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_close_fd, agent);
665  close(agent->from_child);
666  close(agent->to_child);
667 
668  /* set the priority of the process to the job's priority */
669  if (nice(agent->owner->priority) == -1)
670  ERROR("unable to correctly set priority of agent process %d", agent->pid);
671 
672  /* if host is null, the agent will run locally to */
673  /* run the agent locally, use the commands that */
674  /* were parsed when the meta_agent was created */
675  if (strcmp(agent->host->address, LOCAL_HOST) == 0)
676  {
677  shell_parse(scheduler->sysconfigdir, agent->owner->user_id, agent->owner->group_id,
678  agent->type->raw_cmd, agent->owner->jq_cmd_args,
679  agent->owner->parent_id, &argc, &args);
680 
681  tmp = args[0];
682  args[0] = g_strdup_printf(AGENT_BINARY, scheduler->sysconfigdir,
683  AGENT_CONF, agent->type->name, tmp);
684 
685  strcpy(buffer, args[0]);
686  *strrchr(buffer, '/') = '\0';
687  if (chdir(buffer) != 0)
688  {
689  ERROR("unable to change working directory: %s\n", strerror(errno));
690  }
691 
692  execv(args[0], args);
693  }
694  /* otherwise the agent will be started using ssh */
695  /* if the agent is started using ssh we don't need */
696  /* to fully parse the arguments, just pass the run */
697  /* command as the last argument to the ssh command */
698  else
699  {
700  args = g_new0(char*, 5);
701  len = snprintf(buffer, sizeof(buffer), AGENT_BINARY " --userID=%d --groupID=%d --scheduler_start --jobId=%d",
702  agent->host->agent_dir, AGENT_CONF, agent->type->name, agent->type->raw_cmd,
703  agent->owner->user_id, agent->owner->group_id, agent->owner->parent_id);
704 
705  if (len>=sizeof(buffer)) {
706  *(buffer + sizeof(buffer) - 1) = '\0';
707  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: truncated buffer: \"%s\"",
708  __FILE__, __LINE__, agent->owner->id, agent->owner->agent_type, buffer);
709 
710  exit(5);
711  }
712 
713  args[0] = "/usr/bin/ssh";
714  args[1] = agent->host->address;
715  args[2] = buffer;
716  args[3] = agent->owner->jq_cmd_args;
717  args[4] = NULL;
718  execv(args[0], args);
719  }
720 
721  /* If we reach here, the exec call has failed */
722  log_printf("ERROR %s.%d: JOB[%d.%s]: exec failed: pid = %d, errno = \"%s\"", __FILE__, __LINE__, agent->owner->id,
723  agent->owner->agent_type, getpid(), strerror(errno));
724  }
725  /* we are in the parent */
726  else
727  {
728  event_signal(agent_create_event, agent);
729  agent_listen(scheduler, agent);
730  }
731 
732  return NULL;
733 }
734 
735 /* ************************************************************************** */
736 /* **** Constructor Destructor ********************************************** */
737 /* ************************************************************************** */
738 
755 meta_agent_t* meta_agent_init(char* name, char* cmd, int max, int spc)
756 {
757  /* locals */
758  meta_agent_t* ma;
759 
760  /* test inputs */
761  if (!name || !cmd)
762  {
763  ERROR("invalid arguments passed to meta_agent_init()");
764  return NULL;
765  }
766 
767  /* confirm valid inputs */
768  if (strlen(name) > MAX_NAME || strlen(cmd) > MAX_CMD)
769  {
770  log_printf("ERROR failed to load %s meta agent", name);
771  return NULL;
772  }
773 
774  /* inputs are valid, create the meta_agent */
775  ma = g_new0(meta_agent_t, 1);
776 
777  strcpy(ma->name, name);
778  strcpy(ma->raw_cmd, cmd);
779  strcat(ma->raw_cmd, " --scheduler_start");
780  ma->max_run = max;
781  ma->run_count = 0;
782  ma->special = spc;
783  ma->version = NULL;
784  ma->valid = TRUE;
785 
786  return ma;
787 }
788 
796 {
797  TEST_NULV(ma);
798  g_free(ma->version);
799  g_free(ma);
800 }
801 
813 agent_t* agent_init(scheduler_t* scheduler, host_t* host, job_t* job)
814 {
815  /* local variables */
816  agent_t* agent;
817  int child_to_parent[2];
818  int parent_to_child[2];
819  agent_spawn_args* pass;
820 
821  /* check job input */
822  if (!job)
823  {
824  log_printf("ERROR %s.%d: NULL job passed to agent init\n", __FILE__, __LINE__);
825  log_printf("ERROR: no other information available\n");
826  return NULL;
827  }
828 
829  /* check that the agent type exists */
830  if (g_tree_lookup(scheduler->meta_agents, job->agent_type) == NULL)
831  {
832  log_printf("ERROR %s.%d: jq_pk %d jq_type %s does not match any module in mods-enabled\n", __FILE__, __LINE__,
833  job->id, job->agent_type);
834  job->message = NULL;
835  job_fail_event(scheduler, job);
836  job_remove_agent(job, scheduler->job_list, NULL);
837  return NULL;
838  }
839 
840  /* allocate memory and do trivial assignments */
841  agent = g_new(agent_t, 1);
842  agent->type = g_tree_lookup(scheduler->meta_agents, job->agent_type);
843  agent->status = AG_CREATED;
844 
845  /* make sure that there is a metaagent for the job */
846  if (agent->type == NULL)
847  {
848  ERROR("meta agent %s does not exist", job->agent_type);
849  return NULL;
850  }
851 
852  /* check if the agent is valid */
853  if (!agent->type->valid)
854  {
855  ERROR("agent %s has been invalidated by version information", job->agent_type);
856  return NULL;
857  }
858 
859  /* create the pipes between the child and the parent */
860  if (pipe(parent_to_child) != 0)
861  {
862  ERROR("JOB[%d.%s] failed to create parent to child pipe", job->id, job->agent_type);
863  g_free(agent);
864  return NULL;
865  }
866  if (pipe(child_to_parent) != 0)
867  {
868  ERROR("JOB[%d.%s] failed to create child to parent pipe", job->id, job->agent_type);
869  g_free(agent);
870  return NULL;
871  }
872 
873  /* set file identifiers to correctly talk to children */
874  agent->from_parent = parent_to_child[0];
875  agent->to_child = parent_to_child[1];
876  agent->from_child = child_to_parent[0];
877  agent->to_parent = child_to_parent[1];
878 
879  /* initialize other info */
880  agent->host = host;
881  agent->owner = job;
882  agent->updated = 0;
883  agent->n_updates = 0;
884  agent->data = NULL;
885  agent->return_code = -1;
886  agent->total_analyzed = 0;
887  agent->special = 0;
888 
889  /* open the relevant file pointers */
890  if ((agent->read = fdopen(agent->from_child, "r")) == NULL)
891  {
892  ERROR("JOB[%d.%s] failed to initialize read file", job->id, job->agent_type);
893  g_free(agent);
894  return NULL;
895  }
896  if ((agent->write = fdopen(agent->to_child, "w")) == NULL)
897  {
898  ERROR("JOB[%d.%s] failed to initialize write file", job->id, job->agent_type);
899  g_free(agent);
900  return NULL;
901  }
902 
903  /* increase the load on the host and count of running agents */
904  if (agent->owner->id > 0)
905  {
906  host_increase_load(agent->host);
908  }
909 
910  /* spawn the listen thread */
911  pass = g_new0(agent_spawn_args, 1);
912  pass->scheduler = scheduler;
913  pass->agent = agent;
914 
915 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
916  agent->thread = g_thread_new(agent->type->name, (GThreadFunc) agent_spawn, pass);
917 #else
918  agent->thread = g_thread_create((GThreadFunc)agent_spawn, pass, 1, NULL);
919 #endif
920 
921  return agent;
922 }
923 
935 void agent_destroy(agent_t* agent)
936 {
937  TEST_NULV(agent);
938 
939  /* close all of the files still open for this agent */
940  close(agent->from_child);
941  close(agent->to_child);
942  close(agent->from_parent);
943  close(agent->to_parent);
944  fclose(agent->write);
945  fclose(agent->read);
946 
947  /* release the child process */
948  g_free(agent);
949 }
950 
951 /* ************************************************************************** */
952 /* **** Events ************************************************************** */
953 /* ************************************************************************** */
954 
963 void agent_death_event(scheduler_t* scheduler, pid_t* pid)
964 {
965  agent_t* agent;
966  int status = pid[1];
967 
968  if ((agent = g_tree_lookup(scheduler->agents, &pid[0])) == NULL)
969  {
970  ERROR("invalid agent death event: pid[%d]", pid[0]);
971  return;
972  }
973 
974  if (agent->owner->id >= 0)
975  event_signal(database_update_event, NULL);
976 
977  if (write(agent->to_parent, "@@@1\n", 5) != 5)
978  AGENT_SEQUENTIAL_PRINT("write to agent unsuccessful: %s\n", strerror(errno));
979  g_thread_join(agent->thread);
980 
981  if (agent->return_code != 0)
982  {
983  if (WIFEXITED(status))
984  {
985  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", (status >> 8));
986  }
987  else if (WIFSIGNALED(status))
988  {
989  AGENT_CONCURRENT_PRINT("agent was killed by signal: %d.%s\n", WTERMSIG(status), strsignal(WTERMSIG(status)));
990  if (WCOREDUMP(status))
991  AGENT_CONCURRENT_PRINT("agent produced core dump\n");
992  }
993  else
994  {
995  AGENT_CONCURRENT_PRINT("agent failed, code: %d\n", agent->return_code);
996  }
997  AGENT_WARNING("agent closed unexpectedly, agent status was %s", agent_status_strings[agent->status]);
998  agent_fail_event(scheduler, agent);
999  }
1000 
1001  if (agent->status != AG_PAUSED && agent->status != AG_FAILED)
1002  agent_transition(agent, AG_PAUSED);
1003 
1004  job_update(scheduler, agent->owner);
1005  if (agent->status == AG_FAILED && agent->owner->id < 0)
1006  {
1007  log_printf("ERROR %s.%d: agent %s.%s has failed scheduler startup test\n", __FILE__, __LINE__, agent->host->name,
1008  agent->type->name);
1009  agent->type->valid = 0;
1010  }
1011 
1012  if (agent->owner->id < 0 && !agent->type->valid)
1013  AGENT_SEQUENTIAL_PRINT("agent failed startup test, removing from meta agents\n");
1014 
1015  AGENT_SEQUENTIAL_PRINT("successfully remove from the system\n");
1016  job_remove_agent(agent->owner, scheduler->job_list, agent);
1017  g_tree_remove(scheduler->agents, &agent->pid);
1018  g_free(pid);
1019 }
1020 
1032 void agent_create_event(scheduler_t* scheduler, agent_t* agent)
1033 {
1034  TEST_NULV(agent);
1035 
1036  AGENT_SEQUENTIAL_PRINT("agent successfully spawned\n");
1037  g_tree_insert(scheduler->agents, &agent->pid, agent);
1038  agent_transition(agent, AG_SPAWNED);
1039  job_add_agent(agent->owner, agent);
1040 }
1041 
1053 void agent_ready_event(scheduler_t* scheduler, agent_t* agent)
1054 {
1055  int ret;
1056 
1057  TEST_NULV(agent);
1058  if (agent->status == AG_SPAWNED)
1059  {
1060  agent_transition(agent, AG_RUNNING);
1061  AGENT_SEQUENTIAL_PRINT("agent successfully created\n");
1062  }
1063 
1064  if ((ret = job_is_open(scheduler, agent->owner)) == 0)
1065  {
1066  agent_transition(agent, AG_PAUSED);
1067  job_finish_agent(agent->owner, agent);
1068  job_update(scheduler, agent->owner);
1069  return;
1070  }
1071  else if (ret < 0)
1072  {
1073  agent_transition(agent, AG_FAILED);
1074  return;
1075  }
1076  else
1077  {
1078  agent->data = job_next(agent->owner);
1079  agent->updated = 1;
1080  }
1081 
1082  if (write(agent->to_parent, "@@@0\n", 5) != 5)
1083  {
1084  AGENT_ERROR("failed sending new data to agent");
1085  agent_kill(agent);
1086  }
1087 }
1088 
1098 void agent_update_event(scheduler_t* scheduler, void* unused)
1099 {
1100  g_tree_foreach(scheduler->agents, (GTraverseFunc) update, NULL);
1101 }
1102 
1113 void agent_fail_event(scheduler_t* scheduler, agent_t* agent)
1114 {
1115  TEST_NULV(agent);
1116  agent_transition(agent, AG_FAILED);
1117  job_fail_agent(agent->owner, agent);
1118  if (write(agent->to_parent, "@@@1\n", 5) != 5)
1119  AGENT_ERROR("Failed to kill agent thread cleanly");
1120 }
1121 
1129 void list_agents_event(scheduler_t* scheduler, GOutputStream* ostr)
1130 {
1131  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_list, ostr);
1132  g_output_stream_write(ostr, "\nend\n", 5, NULL, NULL);
1133 }
1134 
1135 /* ************************************************************************** */
1136 /* **** Modifier Functions ************************************************** */
1137 /* ************************************************************************** */
1138 
1147 void agent_transition(agent_t* agent, agent_status new_status)
1148 {
1149  AGENT_SEQUENTIAL_PRINT("agent status change: %s -> %s\n", agent_status_strings[agent->status],
1150  agent_status_strings[new_status]);
1151 
1152  if (agent->owner->id > 0)
1153  {
1154  if (agent->status == AG_PAUSED)
1155  {
1156  host_increase_load(agent->host);
1158  }
1159  if (new_status == AG_PAUSED)
1160  {
1161  host_decrease_load(agent->host);
1163  }
1164  }
1165 
1166  agent->status = new_status;
1167 }
1168 
1175 void agent_pause(agent_t* agent)
1176 {
1177  kill(agent->pid, SIGSTOP);
1178  agent_transition(agent, AG_PAUSED);
1179 }
1180 
1189 {
1190  kill(agent->pid, SIGCONT);
1191  agent_transition(agent, AG_RUNNING);
1192 }
1193 
1204 void agent_print_status(agent_t* agent, GOutputStream* ostr)
1205 {
1206  gchar* status_str;
1207  char time_buf[64];
1208  struct tm* time_info;
1209 
1210  TEST_NULV(agent);
1211  TEST_NULV(ostr);
1212 
1213  strcpy(time_buf, "(none)");
1214  time_info = localtime(&agent->check_in);
1215  if (time_info)
1216  strftime(time_buf, sizeof(time_buf), "%F %T", localtime(&agent->check_in));
1217  status_str = g_strdup_printf("agent:%d host:%s type:%s status:%s time:%s\n", agent->pid, agent->host->name,
1218  agent->type->name, agent_status_strings[agent->status], time_buf);
1219 
1220  AGENT_SEQUENTIAL_PRINT("AGENT_STATUS: %s", status_str);
1221  g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
1222  g_free(status_str);
1223  return;
1224 }
1225 
1234 void agent_kill(agent_t* agent)
1235 {
1236  AGENT_SEQUENTIAL_PRINT("KILL: sending SIGKILL to pid %d\n", agent->pid);
1238  kill(agent->pid, SIGKILL);
1239 }
1240 
1249 int aprintf(agent_t* agent, const char* fmt, ...)
1250 {
1251  va_list args;
1252  int rc;
1253  char* tmp;
1254 
1255  va_start(args, fmt);
1256  if (TVERB_AGENT)
1257  {
1258  tmp = g_strdup_vprintf(fmt, args);
1259  tmp[strlen(tmp) - 1] = '\0';
1260  AGENT_CONCURRENT_PRINT("sent to agent \"%s\"\n", tmp);
1261  rc = fprintf(agent->write, "%s\n", tmp);
1262  g_free(tmp);
1263  }
1264  else
1265  {
1266  rc = vfprintf(agent->write, fmt, args);
1267  }
1268  va_end(args);
1269  fflush(agent->write);
1270 
1271  return rc;
1272 }
1273 
1285 ssize_t agent_write(agent_t* agent, const void* buf, int count)
1286 {
1287  return write(agent->to_parent, buf, count);
1288 }
1289 
1290 /* ************************************************************************** */
1291 /* **** static functions and meta agents ************************************ */
1292 /* ************************************************************************** */
1293 
1301 void test_agents(scheduler_t* scheduler)
1302 {
1303  g_tree_foreach(scheduler->meta_agents, (GTraverseFunc) agent_test, scheduler);
1304 }
1305 
1312 void kill_agents(scheduler_t* scheduler)
1313 {
1314  g_tree_foreach(scheduler->agents, (GTraverseFunc) agent_kill_traverse, NULL);
1315 }
1316 
1327 int add_meta_agent(GTree* meta_agents, char* name, char* cmd, int max, int spc)
1328 {
1329  meta_agent_t* ma;
1330 
1331  if (name == NULL)
1332  return 0;
1333 
1334  if (g_tree_lookup(meta_agents, name) == NULL)
1335  {
1336  if ((ma = meta_agent_init(name, cmd, max, spc)) == NULL)
1337  return 0;
1338  g_tree_insert(meta_agents, ma->name, ma);
1339  return 1;
1340  }
1341 
1342  return 0;
1343 }
1344 
1352 int is_meta_special(meta_agent_t* ma, int special_type)
1353 {
1354  return (ma != NULL) && ((ma->special & special_type) != 0);
1355 }
1356 
1364 int is_agent_special(agent_t* agent, int special_type)
1365 {
1366  return (agent != NULL) && ((agent->special & special_type) != 0);
1367 }
1368 
1374 {
1375  ma->run_count++;
1376  V_AGENT("AGENT[%s] run increased to %d\n", ma->name, ma->run_count);
1377 }
1378 
1384 {
1385  ma->run_count--;
1386  V_AGENT("AGENT[%s] run decreased to %d\n", ma->name, ma->run_count);
1387 }
void meta_agent_increase_count(meta_agent_t *ma)
Definition: agent.c:1373
gboolean alive
flag to tell the scheduler if the agent is still alive
Definition: agent.h:136
uint8_t n_updates
keeps track of the number of times the agent has updated
Definition: agent.h:120
char name[256]
the name associated with this agent i.e. nomos, copyright...
Definition: agent.h:94
#define AGENT_WARNING(...)
Definition: agent.c:96
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
Definition: job.c:581
meta_agent_t * meta_agent_init(char *name, char *cmd, int max, int spc)
Creates a new meta agent.
Definition: agent.c:755
GTree * meta_agents
List of all meta agents available to the scheduler.
Definition: scheduler.h:167
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
Definition: agent.c:1129
#define ERROR(...)
Definition: logging.h:90
int32_t user_id
The id of the user that created the job.
Definition: job.h:85
static int agent_test(const gchar *name, meta_agent_t *ma, scheduler_t *scheduler)
GTraversalFunction that tests the current agent on every host.
Definition: agent.c:249
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Definition: agent.c:1204
int add_meta_agent(GTree *meta_agents, char *name, char *cmd, int max, int spc)
Definition: agent.c:1327
void job_fail_agent(job_t *job, void *agent)
Definition: job.c:504
Store the results of a regex match.
Definition: scanners.hpp:39
scheduler_t * scheduler
Reference to current scheduler state.
Definition: agent.c:617
int is_meta_special(meta_agent_t *ma, int special_type)
tests if a particular meta agent has a specific special flag set
Definition: agent.c:1352
agent_t * agent
Reference to current agent state.
Definition: agent.c:618
uint64_t total_analyzed
the total number that this agent has analyzed
Definition: agent.h:135
void agent_death_event(scheduler_t *scheduler, pid_t *pid)
Definition: agent.c:963
static int agent_list(char *name, meta_agent_t *ma, GOutputStream *ostr)
GTraverseFunction that will print the name of every agent in alphabetical order separated by spaces...
Definition: agent.c:228
int run_count
the count of agents in running state
Definition: agent.h:101
#define AGENT_STATUS_TYPES(apply)
Definition: agent.h:61
GList * host_queue
Round-robin queue for choosing which host use next.
Definition: scheduler.h:172
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:852
gchar * jq_cmd_args
Command line arguments for this job.
Definition: job.h:74
#define AGENT_CONF
Agent conf location.
Definition: scheduler.h:134
host_t * host
the host that this agent will start on
Definition: agent.h:114
#define SAG_NOKILL
This agent should not be killed when updating the agent.
Definition: agent.h:44
GThread * thread
the thread that communicates with this agent
Definition: agent.h:118
gboolean updated
boolean flag to indicate if the scheduler has updated the data
Definition: agent.h:134
int32_t priority
Importance of the job, maps directory to unix priority.
Definition: job.h:81
gchar * sysconfigdir
The system directory that contain fossology.conf.
Definition: scheduler.h:161
int jobId
The id of the job.
#define AGENT_ERROR(...)
Definition: agent.c:82
int max_run
the maximum number that can run at once -1 if no limit
Definition: agent.h:96
GSequence * job_queue
heap of jobs that still need to be started
Definition: scheduler.h:184
meta_agent_t * type
the type of agent this is i.e. bucket, copyright...
Definition: agent.h:113
Log related operations.
void agent_fail_event(scheduler_t *scheduler, agent_t *agent)
Fails an agent.
Definition: agent.c:1113
void agent_transition(agent_t *agent, agent_status new_status)
Definition: agent.c:1147
void meta_agent_decrease_count(meta_agent_t *ma)
Definition: agent.c:1383
char * address
The address of the host, used by ssh when starting a new agent.
Definition: host.h:40
char * name
The name of the host, used to store host internally to scheduler.
Definition: host.h:39
#define AGENT_CONCURRENT_PRINT(...)
Definition: agent.c:108
void meta_agent_destroy(meta_agent_t *ma)
Definition: agent.c:795
int to_parent
file identifier to print to the parent (child stdout)
Definition: agent.h:127
void kill_agents(scheduler_t *scheduler)
Call the agent_kill function for every agent within the system.
Definition: agent.c:1312
#define AGENT_SEQUENTIAL_PRINT(...)
Definition: agent.c:103
log_t * job_log(job_t *job)
Definition: job.c:648
time_t check_in
the time that the agent last generated anything
Definition: agent.h:119
char * job_next(job_t *job)
Definition: job.c:621
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:183
agent_t * agent_init(scheduler_t *scheduler, host_t *host, job_t *job)
Allocate and spawn a new agent.
Definition: agent.c:813
void agent_ready_event(scheduler_t *scheduler, agent_t *agent)
Event created when an agent is ready for more data.
Definition: agent.c:1053
void agent_destroy(agent_t *agent)
Frees the memory associated with an agent.
Definition: agent.c:935
#define SELECT_STRING(passed)
Definition: agent.c:120
int from_parent
file identifier to read from the parent (child stdin)
Definition: agent.h:124
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
Definition: job.c:460
uint8_t return_code
what was returned by the agent when it disconnected
Definition: agent.h:137
char * version_source
the machine that reported the version information
Definition: agent.h:98
void host_increase_load(host_t *host)
Increase the number of running agents on a host by 1.
Definition: host.c:114
#define AGENT_BINARY
Format to get agent binary.
Definition: scheduler.h:133
static int agent_kill_traverse(int *pid, agent_t *agent, gpointer unused)
GTraversalFunction that kills all of the agents.
Definition: agent.c:213
FILE * write
FILE* that abstracts the use of the to_child socket.
Definition: agent.h:129
agent_status status
the state of execution the agent is currently in
Definition: agent.h:117
ssize_t agent_write(agent_t *agent, const void *buf, int count)
Definition: agent.c:1285
int aprintf(agent_t *agent, const char *fmt,...)
Definition: agent.c:1249
void agent_create_event(scheduler_t *scheduler, agent_t *agent)
Event created when a new agent has been created.
Definition: agent.c:1032
void job_update(scheduler_t *scheduler, job_t *job)
Definition: job.c:542
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
Definition: libfossagent.c:309
#define TEST_NULV(a)
Test if paramater is NULL.
Definition: agent.c:58
static void shell_parse(char *confdir, int user_id, int group_id, char *input, char *jq_cmd_args, int jobId, int *argc, char ***argv)
Parses the shell command that is found in the configuration file.
Definition: agent.c:562
#define MAX_CMD
the size of the agent&#39;s command buffer (arbitrary)
Definition: agent.h:37
log_t * main_log
Definition: logging.c:44
void agent_unpause(agent_t *agent)
Definition: agent.c:1188
void agent_pause(agent_t *agent)
Definition: agent.c:1175
GTree * agents
List of any currently running agents.
Definition: scheduler.h:168
char * agent_type
The type of agent used to analyze the data.
Definition: job.h:64
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
Definition: job.c:445
void job_finish_agent(job_t *job, void *agent)
Definition: job.c:489
int32_t parent_id
The identifier for the parent of this job (its queue id)
Definition: job.h:83
Definition: agent.h:110
int from_child
file identifier to read from child
Definition: agent.h:126
char raw_cmd[MAX_CMD+1]
the raw command that will start the agent, used for ssh
Definition: agent.h:95
char buffer[2048]
The last thing received from the scheduler.
#define TEST_NULL(a, ret)
Test if paramater is NULL.
Definition: agent.c:68
void agent_update_event(scheduler_t *scheduler, void *unused)
Definition: agent.c:1098
pid_t pid
the pid of the process this agent is running in
Definition: agent.h:121
int32_t group_id
The id of the group that created the job.
Definition: job.h:86
Event handling operations.
void agent_kill(agent_t *agent)
Unclean kill of an agent.
Definition: agent.c:1234
void database_job_processed(int j_id, int num)
Updates the number of items that a job queue entry has processed.
Definition: database.c:1003
int valid
flag indicating if the meta_agent is valid
Definition: agent.h:100
static void agent_listen(scheduler_t *scheduler, agent_t *agent)
Definition: agent.c:275
Header file with agent related operations.
int special
any special condition associated with the agent
Definition: agent.h:97
Definition: host.h:38
gchar * message
Message that will be sent with job notification email.
Definition: job.h:80
int to_child
file identifier to print to the child
Definition: agent.h:125
GRegex * parse_agent_msg
Parses messages coming from the agents.
Definition: scheduler.h:197
The job structure.
Definition: job.h:61
FILE * read
FILE* that abstracts the use of the from_child socket.
Definition: agent.h:128
#define THREAD_FATAL(file,...)
Definition: logging.h:82
uint32_t special
any special flags that the agent has set
Definition: agent.h:138
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
Definition: job.c:175
char * agent_dir
The location on the host machine where the executables are.
Definition: host.h:41
static int agent_close_fd(int *pid_ptr, agent_t *agent, agent_t *excepted)
This will close all of the agent&#39;s pipes.
Definition: agent.c:140
int is_agent_special(agent_t *agent, int special_type)
tests if a particular agent has a specific special flag set
Definition: agent.c:1364
gchar * data
the data that has been sent to the agent for analysis
Definition: agent.h:133
job_status status
The current status for the job.
Definition: job.h:72
Header file for the scheduler.
void test_agents(scheduler_t *scheduler)
Calls the agent test function for every type of agent.
Definition: agent.c:1301
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:417
#define MAX_NAME
the size of the agent&#39;s name buffer (arbitrary)
Definition: agent.h:38
void host_decrease_load(host_t *host)
Decrease the number of running agents on a host by 1.
Definition: host.c:125
static void * agent_spawn(agent_spawn_args *pass)
Spawns a new agent using the command passed in using the meta agent.
Definition: agent.c:640
static int update(int *pid_ptr, agent_t *agent, gpointer unused)
Definition: agent.c:163
char * version
the version of the agent that is running on all hosts
Definition: agent.h:99
job_t * owner
the job that this agent is assigned to
Definition: agent.h:132
int32_t id
The identifier for this job.
Definition: job.h:84