24 #include <libfossrepo.h> 34 #include <sys/types.h> 37 #include <sys/resource.h> 43 #define TEST_NULV(j) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return; } 44 #define TEST_NULL(j, ret) if(!j) { errno = EINVAL; ERROR("job passed is NULL, cannot proceed"); return ret; } 45 #define MAX_SQL 512;JOB_STATUS_TYPES 55 #define SELECT_STRING(passed) MK_STRING_LIT(JOB_##passed), 56 const char* job_status_strings[] = { JOB_STATUS_TYPES(
SELECT_STRING) };
93 gchar* status_str = g_strdup_printf(
94 "job:%d status:%s type:%s, priority:%d running:%d finished:%d failed:%d\n",
96 job_status_strings[job->
status],
103 V_JOB(
"JOB_STATUS: %s", status_str);
104 g_output_stream_write(ostr, status_str, strlen(status_str), NULL, NULL);
124 V_JOB(
"JOB[%d]: job status changed: %s => %s\n",
125 job->
id, job_status_strings[job->
status], job_status_strings[new_status]);
146 static gint
job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
148 return ((
job_t*)a)->priority - ((
job_t*)b)->priority;
176 char* type,
char* host,
int id,
int parent_id,
int user_id,
int group_id,
int priority,
char *jq_cmd_args)
186 job->
status = JB_CHECKEDOUT;
200 g_tree_insert(job_list, &job->
id, job);
201 if(
id >= 0) g_sequence_insert_sorted(job_queue, job,
job_compare, NULL);
221 g_mutex_lock(job->
lock);
222 g_mutex_unlock(job->
lock);
223 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32 224 g_mutex_clear(job->
lock);
226 g_mutex_free(job->
lock);
279 const char end[] =
"end\n";
280 GError* error = NULL;
287 memset(buf,
'\0',
sizeof(buf));
288 sprintf(buf,
"scheduler:%d revision:%s daemon:%d jobs:%d log:%s port:%d verbose:%d\n",
293 g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
298 job_t* stat = g_tree_lookup(scheduler->
job_list, ¶ms->second);
305 sprintf(buf,
"ERROR: invalid job id = %d\n", params->second);
306 g_output_stream_write(params->first, buf, strlen(buf), NULL, NULL);
310 g_output_stream_write(params->first, end,
sizeof(end), NULL, NULL);
326 job_t* job = params->first;
330 if(params->first == NULL)
332 tmp_job.
id = params->second;
333 tmp_job.
status = JB_NOT_AVAILABLE;
356 job_t* job = params->first;
362 tmp_job.
id = params->second;
363 tmp_job.
status = JB_PAUSED;
371 if(job->
status != JB_PAUSED)
373 ERROR(
"attempt to restart job %d failed, job status was %s",
374 job->
id, job_status_strings[job->
status]);
401 ((
job_t*)params->first)->priority = params->second;
402 for(iter = ((
job_t*)params->first)->running_agents; iter; iter = iter->next)
403 setpriority(PRIO_PROCESS, ((
agent_t*)iter->data)->pid, params->second);
421 if(job->
status != JB_FAILED)
426 V_JOB(
"JOB[%d]: job failed, killing agents\n", job->
id);
470 V_JOB(
"JOB[%d]: job removed from system\n", job->
id);
473 ((
agent_t*)curr->data)->owner = NULL;
474 for(curr = job->
failed_agents; curr != NULL; curr = curr->next)
475 ((
agent_t*)curr->data)->owner = NULL;
477 ((
agent_t*)curr->data)->owner = NULL;
479 g_tree_remove(job_list, &job->
id);
524 job->
data = g_strdup(data);
550 if(((
agent_t*)iter->data)->status != AG_PAUSED)
553 if(job->
status != JB_PAUSED && job->
status != JB_COMPLETE && finished)
560 aprintf(iter->data,
"CLOSE\n");
589 if(job->
status == JB_CHECKEDOUT)
594 return (job->
idx == 0 && job->
data != NULL);
596 g_mutex_lock(job->
lock);
610 g_mutex_unlock(job->
lock);
632 g_mutex_lock(job->
lock);
637 g_mutex_unlock(job->
lock);
660 file_name = g_strdup_printf(
"%06d", job->
id);
665 ERROR(
"JOB[%d]: job unable to create log file: %s\n", job->
id, file_path);
671 V_JOB(
"JOB[%d]: job created log file:\n %s\n", job->
id, file_path);
694 job_t* retval = NULL;
695 GSequenceIter* beg = g_sequence_get_begin_iter(job_queue);
697 if(g_sequence_get_length(job_queue) != 0)
699 retval = g_sequence_get(beg);
700 g_sequence_remove(beg);
716 if(g_sequence_get_length(job_queue) == 0)
721 beg = g_sequence_get_begin_iter(job_queue);
722 return g_sequence_get(beg);
734 g_tree_foreach(job_list, (GTraverseFunc)
is_active, &count);
void job_verbose_event(scheduler_t *scheduler, job_t *job)
GMutex * lock
Lock to maintain data integrity.
int job_is_open(scheduler_t *scheduler, job_t *job)
Tests to see if there is still data available for this job.
int32_t user_id
The id of the user that created the job.
FILE * fo_RepFwrite(char *Type, char *Filename)
Perform an fwrite. Also creates directories.
void agent_print_status(agent_t *agent, GOutputStream *ostr)
Prints the status of the agent to the output stream provided.
fo_conf * sysconfig
Configuration information loaded from the configuration file.
void job_fail_agent(job_t *job, void *agent)
void job_pause_event(scheduler_t *scheduler, arg_int *params)
Event to pause a job.
void log_destroy(log_t *log)
Free memory associated with the log file.
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
gchar * jq_cmd_args
Command line arguments for this job.
uint16_t i_port
The port that the scheduler is listening on.
void database_update_job(scheduler_t *scheduler, job_t *job, job_status status)
Change the status of a job in the database.
void database_job_log(int j_id, char *log_name)
Enters the name of the log file for a job into the database.
int32_t verbose
The verbose level for all of the agents in this job.
int32_t priority
Importance of the job, maps directory to unix priority.
log_t * log_new_FILE(FILE *log_file, gchar *log_name, gchar *pro_name, pid_t pro_pid)
Creates a log file structure based on an already created FILE*.
PGresult * database_exec(scheduler_t *scheduler, const char *sql)
Executes an sql statement for the scheduler.
PGresult * db_result
Results from the sql query (if any)
char * fo_config_get(fo_conf *conf, const char *group, const char *key, GError **error)
Gets an element based on its group name and key name. If the group or key is not found, the error object is set and NULL is returned.
void job_set_data(scheduler_t *scheduler, job_t *job, char *data, int sql)
char * required_host
If not NULL, this job must run on a specific host machine.
log_t * job_log(job_t *job)
char * job_next(job_t *job)
GTree * job_list
List of jobs that have been created.
uint32_t active_jobs(GTree *job_list)
Gets the number of jobs that are not paused.
void job_destroy(job_t *job)
void job_remove_agent(job_t *job, GTree *job_list, void *agent)
gboolean s_daemon
Is the scheduler being run as a daemon.
#define SafePQclear(pgres)
ssize_t agent_write(agent_t *agent, const void *buf, int count)
job_t * peek_job(GSequence *job_queue)
Gets the job that is at the top of the queue if there is one.
#define SELECT_STRING(passed)
int aprintf(agent_t *agent, const char *fmt,...)
static gint job_compare(gconstpointer a, gconstpointer b, gpointer user_data)
Used to compare two different jobs in the priority queue.
void job_restart_event(scheduler_t *scheduler, arg_int *params)
void job_update(scheduler_t *scheduler, job_t *job)
GList * running_agents
The list of agents assigned to this job that are still working.
int verbose
The verbose flag for the cli.
#define TEST_NULV(a)
Test if paramater is NULL.
void agent_unpause(agent_t *agent)
void agent_pause(agent_t *agent)
char * agent_type
The type of agent used to analyze the data.
job_t * next_job(GSequence *job_queue)
Gets the next job from the job queue.
void job_add_agent(job_t *job, void *agent)
Adds a new agent to the jobs list of agents.
void job_finish_agent(job_t *job, void *agent)
static int job_sstatus(int *job_id, job_t *job, GOutputStream *ostr)
Prints the jobs status to the output stream.
int32_t parent_id
The identifier for the parent of this job (its queue id)
GList * finished_agents
The list of agents that have completed their tasks.
char * fo_RepMkPath(const char *Type, char *Filename)
Given a filename, construct the full path to the file.
gchar * data
The data associated with this job (jq_args)
#define TEST_NULL(a, ret)
Test if paramater is NULL.
gchar * log_name
The name of the log file that will be printed to.
static void job_transition(scheduler_t *scheduler, job_t *job, job_status new_status)
static int is_active(int *job_id, job_t *job, int *counter)
Tests if a job is active.
int32_t group_id
The id of the group that created the job.
void agent_kill(agent_t *agent)
Unclean kill of an agent.
Header file with agent related operations.
gchar * message
Message that will be sent with job notification email.
log_t * log
The log to print any agent logging messages to.
void job_priority_event(scheduler_t *scheduler, arg_int *params)
void job_status_event(scheduler_t *scheduler, arg_int *params)
Event to get the status of the scheduler or a specific job.
gboolean s_pid
The pid of the scheduler process.
job_t * job_init(GTree *job_list, GSequence *job_queue, char *type, char *host, int id, int parent_id, int user_id, int group_id, int priority, char *jq_cmd_args)
Create a new job.
void database_job_priority(scheduler_t *scheduler, job_t *job, int priority)
Changes the priority of a job queue entry in the database.
job_status status
The current status for the job.
Header file for the scheduler.
uint32_t idx
The current index into the sql results.
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
GList * failed_agents
The list of agents that failed while working.
int32_t id
The identifier for this job.