24 #include <libfossrepo.h>    34 #include <sys/types.h>    37 #include <sys/resource.h>    43 #define TEST_NULV(j) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return; }    44 #define TEST_NULL(j, ret) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return ret; }    45 #define MAX_SQL 512;JOB_STATUS_TYPES    55 #define SELECT_STRING(passed) MK_STRING_LIT(JOB_##passed),    56 const char* job_status_strings[] = { JOB_STATUS_TYPES(
SELECT_STRING) };
    93   gchar* status_str = g_strdup_printf(
    94       "job:%d status:%s type:%s, priority:%d running:%d finished:%d failed:%d\n",
    96       job_status_strings[job->
status],
   103   V_JOB(
"JOB_STATUS: %s", status_str);
   104   g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
   124   V_JOB(
"JOB[%d]: job status changed: %s => %s\n",
   125       job->
id, job_status_strings[job->
status], job_status_strings[new_status]);
   146 static gint 
job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
   148   return ((
job_t*)a)->priority - ((
job_t*)b)->priority;
   176     char* type, 
char* host, 
int id, 
int parent_id, 
int user_id, 
int group_id, 
int priority, 
char *jq_cmd_args)
   186   job->
status          = JB_CHECKEDOUT;
   200   g_tree_insert(job_list, &job->
id, job);
   201   if(
id >= 0) g_sequence_insert_sorted(job_queue, job, 
job_compare, NULL);
   221     g_mutex_lock(job->
lock);
   222     g_mutex_unlock(job->
lock);
   223 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32   224     g_mutex_clear(job->
lock);
   226     g_mutex_free(job->
lock);
   279   const char end[] = 
"end\n";
   280   GError* error = NULL;
   287     memset(buf, 
'\0', 
sizeof(buf));
   288     sprintf(buf, 
"scheduler:%d revision:%s daemon:%d jobs:%d log:%s port:%d verbose:%d\n",
   293     g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
   298     job_t* stat = g_tree_lookup(scheduler->
job_list, ¶ms->second);
   305       sprintf(buf, 
"ERROR: invalid job id = %d\n", params->second);
   306       g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
   310   g_output_stream_write(params->first, end, 
sizeof(end), NULL, NULL);
   326   job_t* job = params->first;
   330   if(params->first == NULL)
   332     tmp_job.
id             = params->second;
   333     tmp_job.
status         = JB_NOT_AVAILABLE;
   356   job_t* job = params->first;
   362     tmp_job.
id             = params->second;
   363     tmp_job.
status         = JB_PAUSED;
   371   if(job->
status != JB_PAUSED)
   373     ERROR(
"attempt to restart job %d failed, job status was %s",
   374         job->
id, job_status_strings[job->
status]);
   401   ((
job_t*)params->first)->priority = params->second;
   402   for(iter = ((
job_t*)params->first)->running_agents; iter; iter = iter->next)
   403     setpriority(PRIO_PROCESS, ((
agent_t*)iter->data)->pid, params->second);
   421   if(job->
status != JB_FAILED)
   426     V_JOB(
"JOB[%d]: job failed, killing agents\n", job->
id);
   470     V_JOB(
"JOB[%d]: job removed from system\n", job->
id);
   473       ((
agent_t*)curr->data)->owner = NULL;
   474     for(curr = job->
failed_agents; curr != NULL; curr = curr->next)
   475       ((
agent_t*)curr->data)->owner = NULL;
   477       ((
agent_t*)curr->data)->owner = NULL;
   479     g_tree_remove(job_list, &job->
id);
   524   job->
data = g_strdup(data);
   550     if(((
agent_t*)iter->data)->status != AG_PAUSED)
   553   if(job->
status != JB_PAUSED && job->
status != JB_COMPLETE && finished)
   560         aprintf(iter->data, 
"CLOSE\n");
   589   if(job->
status == JB_CHECKEDOUT)
   594     return (job->
idx == 0 && job->
data != NULL);
   596   g_mutex_lock(job->
lock);
   610   g_mutex_unlock(job->
lock);
   632   g_mutex_lock(job->
lock);
   637   g_mutex_unlock(job->
lock);
   660   file_name = g_strdup_printf(
"%06d", job->
id);
   665     ERROR(
"JOB[%d]: job unable to create log file: %s\n", job->
id, file_path);
   671   V_JOB(
"JOB[%d]: job created log file:\n    %s\n", job->
id, file_path);
   694   job_t* retval = NULL;
   695   GSequenceIter* beg = g_sequence_get_begin_iter(job_queue);
   697   if(g_sequence_get_length(job_queue) != 0)
   699     retval = g_sequence_get(beg);
   700     g_sequence_remove(beg);
   716   if(g_sequence_get_length(job_queue) == 0)
   721   beg = g_sequence_get_begin_iter(job_queue);
   722   return g_sequence_get(beg);
   734   g_tree_foreach(job_list, (GTraverseFunc)
is_active, &count);
 void job_verbose_event(scheduler_t *scheduler, job_t *job)
GMutex * lock
Lock to maintain data integrity. 
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job. 
int32_t user_id
The id of the user that created the job. 
FILE * fo_RepFwrite(char *Type, char *Filename)
Perform an fwrite. Also creates directories. 
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided. 
fo_conf * sysconfig
Configuration information loaded from the configuration file. 
void job_fail_agent(job_t *job, void *agent)
void job_pause_event(scheduler_t *scheduler, arg_int *params)
Event to pause a job. 
void log_destroy(log_t *log)
Free memory associated with the log file. 
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries. 
gchar * jq_cmd_args
Command line arguments for this job. 
uint16_t i_port
The port that the scheduler is listening on. 
void database_update_job(scheduler_t *scheduler, job_t *job, job_status status)
Change the status of a job in the database. 
void database_job_log(int j_id, char *log_name)
Enters the name of the log file for a job into the database. 
int32_t verbose
The verbose level for all of the agents in this job. 
int32_t priority
Importance of the job, maps directory to unix priority. 
log_t * log_new_FILE(FILE *log_file, gchar *log_name, gchar *pro_name, pid_t pro_pid)
Creates a log file structure based on an already created FILE*. 
PGresult * database_exec(scheduler_t *scheduler, const char *sql)
Executes an sql statement for the scheduler. 
PGresult * db_result
Results from the sql query (if any) 
char * fo_config_get(fo_conf *conf, const char *group, const char *key, GError **error)
Gets an element based on its group name and key name. If the group or key is not found, the error object is set and NULL is returned. 
void job_set_data(scheduler_t *scheduler, job_t *job, char *data, int sql)
char * required_host
If not NULL, this job must run on a specific host machine. 
log_t * job_log(job_t *job)
char * job_next(job_t *job)
GTree * job_list
List of jobs that have been created. 
uint32_t active_jobs(GTree *job_list)
Gets the number of jobs that are not paused. 
void job_destroy(job_t *job)
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
gboolean s_daemon
Is the scheduler being run as a daemon. 
#define SafePQclear(pgres)
ssize_t agent_write(agent_t *agent, const void *buf, int count)
job_t * peek_job(GSequence *job_queue)
Gets the job that is at the top of the queue if there is one. 
#define SELECT_STRING(passed)
int aprintf(agent_t *agent, const char *fmt,...)
static gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue. 
void job_restart_event(scheduler_t *scheduler, arg_int *params)
void job_update(scheduler_t *scheduler, job_t *job)
GList * running_agents
The list of agents assigned to this job that are still working. 
int verbose
The verbose flag for the cli. 
#define TEST_NULV(a)
Test if paramater is NULL. 
void agent_unpause(agent_t *agent)
void agent_pause(agent_t *agent)
char * agent_type
The type of agent used to analyze the data. 
job_t * next_job(GSequence *job_queue)
Gets the next job from the job queue. 
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents. 
void job_finish_agent(job_t *job, void *agent)
static int job_sstatus(int *job_id, job_t *job, GOutputStream *ostr)
Prints the jobs status to the output stream. 
int32_t parent_id
The identifier for the parent of this job (its queue id) 
GList * finished_agents
The list of agents that have completed their tasks. 
char * fo_RepMkPath(const char *Type, char *Filename)
Given a filename, construct the full path to the file. 
gchar * data
The data associated with this job (jq_args) 
#define TEST_NULL(a, ret)
Test if paramater is NULL. 
gchar * log_name
The name of the log file that will be printed to. 
static void job_transition(scheduler_t *scheduler, job_t *job, job_status new_status)
static int is_active(int *job_id, job_t *job, int *counter)
Tests if a job is active. 
int32_t group_id
The id of the group that created the job. 
void agent_kill(agent_t *agent)
Unclean kill of an agent. 
Header file with agent related operations. 
gchar * message
Message that will be sent with job notification email. 
log_t * log
The log to print any agent logging messages to. 
void job_priority_event(scheduler_t *scheduler, arg_int *params)
void job_status_event(scheduler_t *scheduler, arg_int *params)
Event to get the status of the scheduler or a specific job. 
gboolean s_pid
The pid of the scheduler process. 
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job. 
void database_job_priority(scheduler_t *scheduler, job_t *job, int priority)
Changes the priority of a job queue entry in the database. 
job_status status
The current status for the job. 
Header file for the scheduler. 
uint32_t idx
The current index into the sql results. 
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed. 
GList * failed_agents
The list of agents that failed while working. 
int32_t id
The identifier for this job.