FOSSology  3.2.0rc1
Open Source License Compliance by Open Source Software
job.c
Go to the documentation of this file.
1 /* **************************************************************
2 Copyright (C) 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3 Copyright (C) 2015 Siemens AG
4 
5 This program is free software; you can redistribute it and/or
6 modify it under the terms of the GNU General Public License
7 version 2 as published by the Free Software Foundation.
8 
9 This program is distributed in the hope that it will be useful,
10 but WITHOUT ANY WARRANTY; without even the implied warranty of
11 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12 GNU General Public License for more details.
13 
14 You should have received a copy of the GNU General Public License along
15 with this program; if not, write to the Free Software Foundation, Inc.,
16 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
17 ************************************************************** */
23 /* local includes */
24 #include <libfossrepo.h>
25 #include <agent.h>
26 #include <database.h>
27 #include <job.h>
28 #include <scheduler.h>
29 
30 /* std library includes */
31 #include <stdlib.h>
32 
33 /* unix library includes */
34 #include <sys/types.h>
35 #include <unistd.h>
36 #include <sys/time.h>
37 #include <sys/resource.h>
38 
39 /* other library includes */
40 #include <glib.h>
41 #include <gio/gio.h>
42 
43 #define TEST_NULV(j) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return; }
44 #define TEST_NULL(j, ret) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return ret; }
45 #define MAX_SQL 512;JOB_STATUS_TYPES
46 
47 /* ************************************************************************** */
48 /* **** Locals ************************************************************** */
49 /* ************************************************************************** */
50 
55 #define SELECT_STRING(passed) MK_STRING_LIT(JOB_##passed),
56 const char* job_status_strings[] = { JOB_STATUS_TYPES(SELECT_STRING) };
57 #undef SELECT_STRING
58 
71 static int is_active(int* job_id, job_t* job, int* counter)
72 {
73  if((job->running_agents != NULL || job->failed_agents != NULL) || job->id < 0)
74  ++(*counter);
75  return 0;
76 }
77 
91 static int job_sstatus(int* job_id, job_t* job, GOutputStream* ostr)
92 {
93  gchar* status_str = g_strdup_printf(
94  "job:%d status:%s type:%s, priority:%d running:%d finished:%d failed:%d\n",
95  job->id,
96  job_status_strings[job->status],
97  job->agent_type,
98  job->priority,
99  g_list_length(job->running_agents),
100  g_list_length(job->finished_agents),
101  g_list_length(job->failed_agents));
102 
103  V_JOB("JOB_STATUS: %s", status_str);
104  g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
105 
106  if(*job_id == 0)
107  g_list_foreach(job->running_agents, (GFunc)agent_print_status, ostr);
108 
109  g_free(status_str);
110  return 0;
111 }
112 
120 static void job_transition(scheduler_t* scheduler, job_t* job, job_status new_status)
121 {
122  /* book keeping */
123  TEST_NULV(job);
124  V_JOB("JOB[%d]: job status changed: %s => %s\n",
125  job->id, job_status_strings[job->status], job_status_strings[new_status]);
126 
127  /* change the job status */
128  job->status = new_status;
129 
130  /* only update database for real jobs */
131  if(job->id >= 0)
132  database_update_job(scheduler, job, new_status);
133 }
134 
146 static gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
147 {
148  return ((job_t*)a)->priority - ((job_t*)b)->priority;
149 }
150 
151 /* ************************************************************************** */
152 /* **** Constructor Destructor ********************************************** */
153 /* ************************************************************************** */
154 
175 job_t* job_init(GTree* job_list, GSequence* job_queue,
176  char* type, char* host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
177 {
178  job_t* job = g_new0(job_t, 1);
179 
180  job->agent_type = g_strdup(type);
181  job->required_host = g_strdup(host);
182  job->running_agents = NULL;
183  job->finished_agents = NULL;
184  job->failed_agents = NULL;
185  job->log = NULL;
186  job->status = JB_CHECKEDOUT;
187  job->data = NULL;
188  job->db_result = NULL;
189  job->lock = NULL;
190  job->idx = 0;
191  job->message = NULL;
192  job->priority = priority;
193  job->verbose = 0;
194  job->parent_id = parent_id;
195  job->id = id;
196  job->user_id = user_id;
197  job->group_id = group_id;
198  job->jq_cmd_args = g_strdup(jq_cmd_args);
199 
200  g_tree_insert(job_list, &job->id, job);
201  if(id >= 0) g_sequence_insert_sorted(job_queue, job, job_compare, NULL);
202  return job;
203 }
204 
212 void job_destroy(job_t* job)
213 {
214  TEST_NULV(job);
215 
216  if(job->db_result != NULL)
217  {
218  SafePQclear(job->db_result);
219 
220  // Lock the mutex to prevent clearing locked mutex
221  g_mutex_lock(job->lock);
222  g_mutex_unlock(job->lock);
223 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
224  g_mutex_clear(job->lock);
225 #else
226  g_mutex_free(job->lock);
227 #endif
228  }
229 
230  if(job->log)
231  log_destroy(job->log);
232 
233  g_list_free(job->running_agents);
234  g_list_free(job->finished_agents);
235  g_list_free(job->failed_agents);
236  g_free(job->message);
237  g_free(job->agent_type);
238  g_free(job->required_host);
239  g_free(job->data);
240  if (job->jq_cmd_args) g_free(job->jq_cmd_args);
241  g_free(job);
242 }
243 
244 /* ************************************************************************** */
245 /* **** Events ************************************************************** */
246 /* ************************************************************************** */
247 
255 void job_verbose_event(scheduler_t* scheduler, job_t* job)
256 {
257  GList* iter;
258 
259  TEST_NULV(job);
260  for(iter = job->running_agents; iter != NULL; iter = iter->next)
261  aprintf(iter->data, "VERBOSE %d\n", job->verbose);
262 }
263 
277 void job_status_event(scheduler_t* scheduler, arg_int* params)
278 {
279  const char end[] = "end\n";
280  GError* error = NULL;
281 
282  int tmp = 0;
283  char buf[1024];
284 
285  if(!params->second)
286  {
287  memset(buf, '\0', sizeof(buf));
288  sprintf(buf, "scheduler:%d revision:%s daemon:%d jobs:%d log:%s port:%d verbose:%d\n",
289  scheduler->s_pid, fo_config_get(scheduler->sysconfig, "BUILD", "COMMIT_HASH", &error),
290  scheduler->s_daemon, g_tree_nnodes(scheduler->job_list), main_log->log_name,
291  scheduler->i_port, verbose);
292 
293  g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
294  g_tree_foreach(scheduler->job_list, (GTraverseFunc)job_sstatus, params->first);
295  }
296  else
297  {
298  job_t* stat = g_tree_lookup(scheduler->job_list, &params->second);
299  if(stat)
300  {
301  job_sstatus(&tmp, g_tree_lookup(scheduler->job_list, &params->second), params->first);
302  }
303  else
304  {
305  sprintf(buf, "ERROR: invalid job id = %d\n", params->second);
306  g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
307  }
308  }
309 
310  g_output_stream_write(params->first, end, sizeof(end), NULL, NULL);
311  g_free(params);
312 }
313 
323 void job_pause_event(scheduler_t* scheduler, arg_int* params)
324 {
325  job_t tmp_job;
326  job_t* job = params->first;
327  GList* iter;
328 
329  // if the job doesn't exist, create a fake
330  if(params->first == NULL)
331  {
332  tmp_job.id = params->second;
333  tmp_job.status = JB_NOT_AVAILABLE;
334  tmp_job.running_agents = NULL;
335  tmp_job.message = NULL;
336 
337  job = &tmp_job;
338  }
339 
340  job_transition(scheduler, job, JB_PAUSED);
341  for(iter = job->running_agents; iter != NULL; iter = iter->next)
342  agent_pause(iter->data);
343 
344  g_free(params);
345 }
346 
353 void job_restart_event(scheduler_t* scheduler, arg_int* params)
354 {
355  job_t tmp_job;
356  job_t* job = params->first;
357  GList* iter;
358 
359  // if the job doesn't exist, create a fake
360  if(job == NULL)
361  {
362  tmp_job.id = params->second;
363  tmp_job.status = JB_PAUSED;
364  tmp_job.running_agents = NULL;
365  tmp_job.message = NULL;
366 
367  event_signal(database_update_event, NULL);
368  job = &tmp_job;
369  }
370 
371  if(job->status != JB_PAUSED)
372  {
373  ERROR("attempt to restart job %d failed, job status was %s",
374  job->id, job_status_strings[job->status]);
375  g_free(params);
376  return;
377  }
378 
379  for(iter = job->running_agents; iter != NULL; iter = iter->next)
380  {
381  if(job->db_result != NULL) agent_write(iter->data, "OK\n", 3);
382  agent_unpause(iter->data);
383  }
384 
385  job_transition(scheduler, job, JB_RESTART);
386  g_free(params);
387 }
388 
396 void job_priority_event(scheduler_t* scheduler, arg_int* params)
397 {
398  GList* iter;
399 
400  database_job_priority(scheduler, params->first, params->second);
401  ((job_t*)params->first)->priority = params->second;
402  for(iter = ((job_t*)params->first)->running_agents; iter; iter = iter->next)
403  setpriority(PRIO_PROCESS, ((agent_t*)iter->data)->pid, params->second);
404  g_free(params);
405 }
406 
417 void job_fail_event(scheduler_t* scheduler, job_t* job)
418 {
419  GList* iter;
420 
421  if(job->status != JB_FAILED)
422  job_transition(scheduler, job, JB_FAILED);
423 
424  for(iter = job->running_agents; iter != NULL; iter = iter->next)
425  {
426  V_JOB("JOB[%d]: job failed, killing agents\n", job->id);
427  agent_kill(iter->data);
428  }
429 }
430 
431 /* ************************************************************************** */
432 /* **** Functions *********************************************************** */
433 /* ************************************************************************** */
434 
445 void job_add_agent(job_t* job, void* agent)
446 {
447  TEST_NULV(job);
448  TEST_NULV(agent);
449  job->running_agents = g_list_append(job->running_agents, agent);
450 }
451 
460 void job_remove_agent(job_t* job, GTree* job_list, void* agent)
461 {
462  GList* curr;
463  TEST_NULV(job);
464 
465  if(job->finished_agents && agent)
466  job->finished_agents = g_list_remove(job->finished_agents, agent);
467 
468  if(job->finished_agents == NULL && (job->status == JB_COMPLETE || job->status == JB_FAILED))
469  {
470  V_JOB("JOB[%d]: job removed from system\n", job->id);
471 
472  for(curr = job->running_agents; curr != NULL; curr = curr->next)
473  ((agent_t*)curr->data)->owner = NULL;
474  for(curr = job->failed_agents; curr != NULL; curr = curr->next)
475  ((agent_t*)curr->data)->owner = NULL;
476  for(curr = job->finished_agents; curr != NULL; curr = curr->next)
477  ((agent_t*)curr->data)->owner = NULL;
478 
479  g_tree_remove(job_list, &job->id);
480  }
481 }
482 
489 void job_finish_agent(job_t* job, void* agent)
490 {
491  TEST_NULV(job);
492  TEST_NULV(agent);
493 
494  job->running_agents = g_list_remove(job->running_agents, agent);
495  job->finished_agents = g_list_append(job->finished_agents, agent);
496 }
497 
504 void job_fail_agent(job_t* job, void* agent)
505 {
506  TEST_NULV(job);
507  TEST_NULV(agent);
508  job->running_agents = g_list_remove(job->running_agents, agent);
509  job->failed_agents = g_list_append(job->failed_agents, agent);
510 }
511 
522 void job_set_data(scheduler_t* scheduler, job_t* job, char* data, int sql)
523 {
524  job->data = g_strdup(data);
525  job->idx = 0;
526 
527  if(sql)
528  {
529  // TODO
530  //j->db_result = PQexec(db_conn, j->data);
531  //j->lock = g_mutex_new();
532  }
533 }
534 
542 void job_update(scheduler_t* scheduler, job_t* job)
543 {
544  GList* iter;
545  int finished = 1;
546 
547  TEST_NULV(job)
548 
549  for(iter = job->running_agents; iter != NULL; iter = iter->next)
550  if(((agent_t*)iter->data)->status != AG_PAUSED)
551  finished = 0;
552 
553  if(job->status != JB_PAUSED && job->status != JB_COMPLETE && finished)
554  {
555  if(job->failed_agents == NULL)
556  {
557  job_transition(scheduler, job, JB_COMPLETE);
558  for(iter = job->finished_agents; iter != NULL; iter = iter->next)
559  {
560  aprintf(iter->data, "CLOSE\n");
561  }
562  }
563  /* this indicates a failed agent */
564  else
565  {
566  g_list_free(job->failed_agents);
567  job->failed_agents = NULL;
568  job->message = NULL;
569  job_fail_event(scheduler, job);
570  }
571  }
572 }
573 
581 int job_is_open(scheduler_t* scheduler, job_t* job)
582 {
583  /* local */
584  int retval = 0;
585 
586  TEST_NULL(job, -1);
587 
588  /* check to make sure that the job status is correct */
589  if(job->status == JB_CHECKEDOUT)
590  job_transition(scheduler, job, JB_STARTED);
591 
592  /* check to see if we even need to worry about sql stuff */
593  if(job->db_result == NULL)
594  return (job->idx == 0 && job->data != NULL);
595 
596  g_mutex_lock(job->lock);
597  if(job->idx < PQntuples(job->db_result))
598  {
599  retval = 1;
600  }
601  else
602  {
603  SafePQclear(job->db_result);
604  job->db_result = database_exec(scheduler, job->data);
605  job->idx = 0;
606 
607  retval = PQntuples(job->db_result) != 0;
608  }
609 
610  g_mutex_unlock(job->lock);
611  return retval;
612 }
613 
621 char* job_next(job_t* job)
622 {
623  char* retval = NULL;
624 
625  TEST_NULL(job, NULL);
626  if(job->db_result == NULL)
627  {
628  job->idx = 1;
629  return job->data;
630  }
631 
632  g_mutex_lock(job->lock);
633 
634  if(job->idx < PQntuples(job->db_result))
635  retval = PQgetvalue(job->db_result, job->idx++, 0);
636 
637  g_mutex_unlock(job->lock);
638  return retval;
639 }
640 
649 {
650  FILE* file;
651  gchar* file_name;
652  gchar* file_path;
653 
654  if(job->id < 0)
655  return main_log;
656 
657  if(job->log)
658  return job->log;
659 
660  file_name = g_strdup_printf("%06d", job->id);
661  file_path = fo_RepMkPath("logs", file_name);
662 
663  if((file = fo_RepFwrite("logs", file_name)) == NULL)
664  {
665  ERROR("JOB[%d]: job unable to create log file: %s\n", job->id, file_path);
666  g_free(file_name);
667  free(file_path);
668  return NULL;
669  }
670 
671  V_JOB("JOB[%d]: job created log file:\n %s\n", job->id, file_path);
672  database_job_log(job->id, file_path);
673  job->log = log_new_FILE(file, file_name, job->agent_type, 0);
674 
675  g_free(file_name);
676  free(file_path);
677  return job->log;
678 }
679 
680 /* ************************************************************************** */
681 /* **** Job list Functions ************************************************** */
682 /* ************************************************************************** */
683 
692 job_t* next_job(GSequence* job_queue)
693 {
694  job_t* retval = NULL;
695  GSequenceIter* beg = g_sequence_get_begin_iter(job_queue);
696 
697  if(g_sequence_get_length(job_queue) != 0)
698  {
699  retval = g_sequence_get(beg);
700  g_sequence_remove(beg);
701  }
702 
703  return retval;
704 }
705 
712 job_t* peek_job(GSequence* job_queue)
713 {
714  GSequenceIter* beg;
715 
716  if(g_sequence_get_length(job_queue) == 0)
717  {
718  return NULL;
719  }
720 
721  beg = g_sequence_get_begin_iter(job_queue);
722  return g_sequence_get(beg);
723 }
724 
731 uint32_t active_jobs(GTree* job_list)
732 {
733  int count = 0;
734  g_tree_foreach(job_list, (GTraverseFunc)is_active, &count);
735  return count;
736 }
737 
void job_verbose_event(scheduler_t *scheduler, job_t *job)
Definition: job.c:255
GMutex * lock
Lock to maintain data integrity.
Definition: job.h:76
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
Definition: job.c:581
#define ERROR(...)
Definition: logging.h:90
int32_t user_id
The id of the user that created the job.
Definition: job.h:85
FILE * fo_RepFwrite(char *Type, char *Filename)
Perform an fwrite. Also creates directories.
Definition: libfossrepo.c:722
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
Definition: agent.c:1204
fo_conf * sysconfig
Configuration information loaded from the configuration file.
Definition: scheduler.h:160
void job_fail_agent(job_t *job, void *agent)
Definition: job.c:504
void job_pause_event(scheduler_t *scheduler, arg_int *params)
Event to pause a job.
Definition: job.c:323
void log_destroy(log_t *log)
Free memory associated with the log file.
Definition: logging.c:161
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:852
Definition: logging.h:45
gchar * jq_cmd_args
Command line arguments for this job.
Definition: job.h:74
uint16_t i_port
The port that the scheduler is listening on.
Definition: scheduler.h:177
void database_update_job(scheduler_t *scheduler, job_t *job, job_status status)
Change the status of a job in the database.
Definition: database.c:956
void database_job_log(int j_id, char *log_name)
Enters the name of the log file for a job into the database.
Definition: database.c:1017
int32_t verbose
The verbose level for all of the agents in this job.
Definition: job.h:82
int32_t priority
Importance of the job, maps directory to unix priority.
Definition: job.h:81
log_t * log_new_FILE(FILE *log_file, gchar *log_name, gchar *pro_name, pid_t pro_pid)
Creates a log file structure based on an already created FILE*.
Definition: logging.c:137
PGresult * database_exec(scheduler_t *scheduler, const char *sql)
Executes an sql statement for the scheduler.
Definition: database.c:814
PGresult * db_result
Results from the sql query (if any)
Definition: job.h:75
char * fo_config_get(fo_conf *conf, const char *group, const char *key, GError **error)
Gets an element based on its group name and key name. If the group or key is not found, the error object is set and NULL is returned.
Definition: fossconfig.c:341
void job_set_data(scheduler_t *scheduler, job_t *job, char *data, int sql)
Definition: job.c:522
char * required_host
If not NULL, this job must run on a specific host machine.
Definition: job.h:65
log_t * job_log(job_t *job)
Definition: job.c:648
char * job_next(job_t *job)
Definition: job.c:621
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:183
uint32_t active_jobs(GTree *job_list)
Gets the number of jobs that are not paused.
Definition: job.c:731
void job_destroy(job_t *job)
Definition: job.c:212
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
Definition: job.c:460
gboolean s_daemon
Is the scheduler being run as a daemon.
Definition: scheduler.h:155
#define SafePQclear(pgres)
Definition: scheduler.h:140
ssize_t agent_write(agent_t *agent, const void *buf, int count)
Definition: agent.c:1285
job_t * peek_job(GSequence *job_queue)
Gets the job that is at the top of the queue if there is one.
Definition: job.c:712
#define SELECT_STRING(passed)
Definition: job.c:55
int aprintf(agent_t *agent, const char *fmt,...)
Definition: agent.c:1249
static gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue.
Definition: job.c:146
void job_restart_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:353
void job_update(scheduler_t *scheduler, job_t *job)
Definition: job.c:542
GList * running_agents
The list of agents assigned to this job that are still working.
Definition: job.h:66
int verbose
The verbose flag for the cli.
Definition: fo_cli.c:49
#define TEST_NULV(a)
Test if paramater is NULL.
Definition: agent.c:58
log_t * main_log
Definition: logging.c:44
void agent_unpause(agent_t *agent)
Definition: agent.c:1188
void agent_pause(agent_t *agent)
Definition: agent.c:1175
char * agent_type
The type of agent used to analyze the data.
Definition: job.h:64
job_t * next_job(GSequence *job_queue)
Gets the next job from the job queue.
Definition: job.c:692
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
Definition: job.c:445
void job_finish_agent(job_t *job, void *agent)
Definition: job.c:489
static int job_sstatus(int *job_id, job_t *job, GOutputStream *ostr)
Prints the jobs status to the output stream.
Definition: job.c:91
Definition: agent.h:110
int32_t parent_id
The identifier for the parent of this job (its queue id)
Definition: job.h:83
GList * finished_agents
The list of agents that have completed their tasks.
Definition: job.h:67
char * fo_RepMkPath(const char *Type, char *Filename)
Given a filename, construct the full path to the file.
Definition: libfossrepo.c:364
gchar * data
The data associated with this job (jq_args)
Definition: job.h:73
#define TEST_NULL(a, ret)
Test if paramater is NULL.
Definition: agent.c:68
gchar * log_name
The name of the log file that will be printed to.
Definition: logging.h:47
static void job_transition(scheduler_t *scheduler, job_t *job, job_status new_status)
Definition: job.c:120
static int is_active(int *job_id, job_t *job, int *counter)
Tests if a job is active.
Definition: job.c:71
int32_t group_id
The id of the group that created the job.
Definition: job.h:86
void agent_kill(agent_t *agent)
Unclean kill of an agent.
Definition: agent.c:1234
Header file with agent related operations.
gchar * message
Message that will be sent with job notification email.
Definition: job.h:80
The job structure.
Definition: job.h:61
log_t * log
The log to print any agent logging messages to.
Definition: job.h:69
Definition: event.h:56
void job_priority_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:396
void job_status_event(scheduler_t *scheduler, arg_int *params)
Event to get the status of the scheduler or a specific job.
Definition: job.c:277
gboolean s_pid
The pid of the scheduler process.
Definition: scheduler.h:154
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
Definition: job.c:175
void database_job_priority(scheduler_t *scheduler, job_t *job, int priority)
Changes the priority of a job queue entry in the database.
Definition: database.c:1032
job_status status
The current status for the job.
Definition: job.h:72
Header file for the scheduler.
uint32_t idx
The current index into the sql results.
Definition: job.h:77
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:417
GList * failed_agents
The list of agents that failed while working.
Definition: job.h:68
int32_t id
The identifier for this job.
Definition: job.h:84