FOSSology  3.2.0rc1
Open Source License Compliance by Open Source Software
interface.c
Go to the documentation of this file.
1 /* **************************************************************
2 Copyright (C) 2010, 2011, 2012 Hewlett-Packard Development Company, L.P.
3 
4 This program is free software; you can redistribute it and/or
5 modify it under the terms of the GNU General Public License
6 version 2 as published by the Free Software Foundation.
7 
8 This program is distributed in the hope that it will be useful,
9 but WITHOUT ANY WARRANTY; without even the implied warranty of
10 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
11 GNU General Public License for more details.
12 
13 You should have received a copy of the GNU General Public License along
14 with this program; if not, write to the Free Software Foundation, Inc.,
15 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
16 ************************************************************** */
22 /* local includes */
23 #include <agent.h>
24 #include <database.h>
25 #include <event.h>
26 #include <interface.h>
27 #include <job.h>
28 #include <logging.h>
29 #include <scheduler.h>
30 
31 /* std library includes */
32 #include <stdio.h>
33 #include <stdlib.h>
34 #include <string.h>
35 #include <limits.h>
36 
37 /* unix library includes */
38 #include <fcntl.h>
39 #include <pthread.h>
40 #include <sys/stat.h>
41 #include <sys/types.h>
42 #include <unistd.h>
43 
44 /* glib includes */
45 #include <glib.h>
46 #include <gio/gio.h>
47 
48 #define FIELD_WIDTH 10
49 #define BUFFER_SIZE 1024
50 
51 #define netw g_output_stream_write
52 
53 #define PROXY_PROTOCOL "socks5"
54 #define PROXY_DEFAULT_PORT 1080
55 
56 /* ************************************************************************** */
57 /* **** Data Types ********************************************************** */
58 /* ************************************************************************** */
59 
64 typedef struct interface_connection
65 {
66  GSocketConnection* conn;
67  GInputStream* istr;
68  GOutputStream* ostr;
70 
71 /* ************************************************************************** */
72 /* **** Local Functions ***************************************************** */
73 /* ************************************************************************** */
74 
83  GSocketConnection* conn, GThreadPool* threads)
84 {
85  interface_connection* inter = g_new0(interface_connection, 1);
86 
87  inter->conn = conn;
88  inter->istr = g_io_stream_get_input_stream((GIOStream*)inter->conn);
89  inter->ostr = g_io_stream_get_output_stream((GIOStream*)inter->conn);
90  g_thread_pool_push(threads, inter, NULL);
91 
92  return inter;
93 }
94 
104 {
105  g_object_unref(inter->conn);
106  g_free(inter);
107 }
108 
138 {
139  GMatchInfo* regex_match;
140  job_t* job;
141  char buffer[BUFFER_SIZE];
142  char org[sizeof(buffer)];
143  char* arg1, * arg2, * arg3;
144  char* cmd;
145  arg_int* params;
146  int i;
147 
148  memset(buffer, '\0', sizeof(buffer));
149 
150  while(g_input_stream_read(conn->istr, buffer, sizeof(buffer), scheduler->cancel, NULL) > 0)
151  {
152  V_INTERFACE("INTERFACE: received \"%s\"\n", buffer);
153  /* convert all characters before first ' ' to lower case */
154  memcpy(org, buffer, sizeof(buffer));
155  for(cmd = buffer; *cmd; cmd++)
156  *cmd = g_ascii_tolower(*cmd);
157  g_regex_match(scheduler->parse_interface_cmd, buffer, 0, &regex_match);
158  cmd = g_match_info_fetch(regex_match, 1);
159 
160  if(cmd == NULL)
161  {
162  g_output_stream_write(conn->ostr, "Invalid command: \"", 18, NULL, NULL);
163  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
164  g_output_stream_write(conn->ostr, "\"\n", 2, NULL, NULL);
165  g_match_info_free(regex_match);
166  WARNING("INTERFACE: invalid command: \"%s\"", buffer);
167  continue;
168  }
169 
170  /* acknowledge that you have received the command */
171  V_INTERFACE("INTERFACE: send \"received\"\n");
172  g_output_stream_write(conn->ostr, "received\n", 9, NULL, NULL);
173 
174  /* command: "close"
175  *
176  * The interface has chosen to close the connection. Return the command
177  * in acknowledgment of the command and end this thread.
178  */
179  if(strcmp(cmd, "close") == 0)
180  {
181  g_output_stream_write(conn->ostr, "CLOSE\n", 6, NULL, NULL);
182  V_INTERFACE("INTERFACE: closing connection to user interface\n");
183 
184  g_match_info_free(regex_match);
185  g_free(cmd);
186  return;
187  }
188 
189  /* command: "stop"
190  *
191  * The interface has instructed the scheduler to shut down gracefully. The
192  * scheduler will wait for all currently executing agents to finish
193  * running, then exit the vent loop.
194  */
195  else if(strcmp(cmd, "stop") == 0)
196  {
197  g_output_stream_write(conn->ostr, "CLOSE\n", 6, NULL, NULL);
198  V_INTERFACE("INTERFACE: shutting down scheduler gracefully\n");
199  event_signal(scheduler_close_event, (void*)0);
200 
201  g_match_info_free(regex_match);
202  g_free(cmd);
203  return;
204  }
205 
206  /* command: "die"
207  *
208  * The interface has instructed the scheduler to shut down. The scheduler
209  * should acknowledge the command and proceed to kill all current executing
210  * agents and exit the event loop
211  */
212  else if(strcmp(cmd, "die") == 0)
213  {
214  g_output_stream_write(conn->ostr, "CLOSE\n", 6, NULL, NULL);
215  V_INTERFACE("INTERFACE: killing the scheduler\n");
216  event_signal(scheduler_close_event, (void*)1);
217 
218  g_match_info_free(regex_match);
219  g_free(cmd);
220  return;
221  }
222 
223  /* command: "load"
224  *
225  * The interface has requested information about the load that the different
226  * hosts are under. The scheduler should respond with the status of all the
227  * hosts.
228  */
229  else if(strcmp(cmd, "load") == 0)
230  {
231  print_host_load(scheduler->host_list, conn->ostr);
232  }
233 
234  /* command: "kill <job_id> <"message">"
235  *
236  * The interface has instructed the scheduler to kill and fail a particular
237  * job. Both arguments are required for this command.
238  *
239  * job_id: The jq_pk for the job that needs to be killed
240  * message: A message that will be in the email notification and the
241  * jq_endtext field of the job queue
242  */
243  else if(strcmp(cmd, "kill") == 0)
244  {
245  arg1 = g_match_info_fetch(regex_match, 3);
246  arg2 = g_match_info_fetch(regex_match, 8);
247 
248  if(arg1)
249  i = atoi(arg1);
250  if(arg1 == NULL || arg2 == NULL || strlen(arg1) == 0 || strlen(arg2) == 0)
251  {
252  g_free(cmd);
253  cmd = g_strdup_printf("Invalid kill command: \"%s\"\n", buffer);
254  g_output_stream_write(conn->ostr, cmd, strlen(cmd), NULL, NULL);
255  }
256  else if((job = g_tree_lookup(scheduler->job_list, &i)) == NULL)
257  {
258  arg3 = g_strdup_printf(jobsql_failed, arg2, i);
259  event_signal(database_exec_event, arg3);
260  }
261  else
262  {
263  if(job->message)
264  g_free(job->message);
265  job->message = strdup(((arg2 == NULL) ? "no message" : arg2));
266  event_signal(job_fail_event, job);
267  }
268 
269  g_free(arg1);
270  g_free(arg2);
271  }
272 
273  /* command: "pause <job_id>"
274  *
275  * The interface has instructed the scheduler to pause a job. This is used
276  * to free up resources on a particular host. The argument is required and
277  * is the jq_pk for the job that needs to be paused.
278  */
279  else if(strcmp(cmd, "pause") == 0)
280  {
281  arg1 = g_match_info_fetch(regex_match, 3);
282 
283  if(arg1 == NULL || strlen(arg1) == 0)
284  {
285  arg1 = g_strdup_printf("Invalid pause command: \"%s\"\n", buffer);
286  WARNING("received invalid pause command: %s", buffer);
287  g_output_stream_write(conn->ostr, arg1, strlen(arg1), NULL, NULL);
288  g_free(arg1);
289  }
290  else
291  {
292  params = g_new0(arg_int, 1);
293  params->second = atoi(arg1);
294  params->first = g_tree_lookup(scheduler->job_list, &params->second);
295  event_signal(job_pause_event, params);
296  g_free(arg1);
297  }
298  }
299 
300  /* command: "reload"
301  *
302  * The scheduler should reload its configuration information. This should
303  * be used if a change to an agent or fossology.conf has been made since
304  * the scheduler started running.
305  */
306  else if(strcmp(cmd, "reload") == 0)
307  {
308  event_signal(scheduler_config_event, NULL);
309  }
310 
311  /* command: "agents"
312  *
313  * The interface has requested a list of agents that the scheduler is able
314  * to run correctly.
315  */
316  else if(strcmp(cmd, "agents") == 0)
317  {
318  event_signal(list_agents_event, conn->ostr);
319  }
320 
321  /* command: "status [job_id]"
322  *
323  * fetches the status of the a particular job or the scheduler. The
324  * argument is not required for this command.
325  *
326  * with job_id:
327  * print job status followed by status of agent belonging to the job
328  * without job_id:
329  * print scheduler statsu followed by status of every job
330  */
331  else if(strcmp(cmd, "status") == 0)
332  {
333  arg1 = g_match_info_fetch(regex_match, 3);
334 
335  params = g_new0(arg_int, 1);
336  params->first = conn->ostr;
337  params->second = (arg1 == NULL) ? 0 : atoi(arg1);
338  event_signal(job_status_event, params);
339 
340  g_free(arg1);
341  }
342 
343  /* command: "restart <job_id>"
344  *
345  * The interface has instructed the scheduler to restart a job that has been
346  * paused. The argument for this command is required and is the jq_pk for
347  * the job that should be restarted.
348  */
349  else if(strcmp(cmd, "restart") == 0)
350  {
351  arg1 = g_match_info_fetch(regex_match, 3);
352 
353  if(arg1 == NULL)
354  {
355  arg1 = g_strdup(buffer);
356  WARNING("received invalid restart command: %s", buffer);
357  snprintf(buffer, sizeof(buffer) - 1,
358  "ERROR: Invalid restart command: %s\n", arg1);
359  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
360  g_free(arg1);
361  }
362  else
363  {
364  params = g_new0(arg_int, 1);
365  params->second = atoi(arg1);
366  params->first = g_tree_lookup(scheduler->job_list, &params->second);
367  event_signal(job_restart_event, params);
368  g_free(arg1);
369  }
370  }
371 
372  /* command: "verbose <job_id|level> [level]"
373  *
374  * The interface has either requested a change in a verbose level, or it
375  * has requested the current verbose level. This command can have no
376  * arguments, 1 argument or 2 arguments.
377  *
378  * no arguments: respond with the verbose level of the scheduler
379  * 1 argument: change the verbose level of the scheduler to the argument
380  * 2 arguments: change the verbose level of the job with the jq_pk of the
381  * first arguement to the second argument
382  */
383  else if(strcmp(cmd, "verbose") == 0)
384  {
385  arg1 = g_match_info_fetch(regex_match, 3);
386  arg2 = g_match_info_fetch(regex_match, 5);
387 
388  if(arg1 == NULL)
389  {
390  if(verbose < 8)
391  {
392  sprintf(buffer, "level: %d\n", verbose);
393  }
394  else
395  {
396  strcpy(buffer, "mask: h d i e s a j\nmask: ");
397  for(i = 1; i < 0x10000; i <<= 1)
398  strcat(buffer, i & verbose ? "1 " : "0 ");
399  strcat(buffer, "\n");
400  }
401  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
402  }
403  else if(arg2 == NULL)
404  {
405  verbose = atoi(arg1);
406  g_free(arg1);
407  }
408  else
409  {
410  i = atoi(arg1);
411  if((job = g_tree_lookup(scheduler->job_list, &i)) == NULL)
412  {
413  g_free(cmd);
414  cmd = g_strdup_printf("Invalid verbose command: \"%s\"\n", buffer);
415  g_output_stream_write(conn->ostr, cmd, strlen(cmd), NULL, NULL);
416  }
417  else
418  {
419  job->verbose = atoi(arg2);
420  event_signal(job_verbose_event, job);
421  }
422 
423  g_free(arg1);
424  g_free(arg2);
425  }
426  }
427 
428  /* command: "priority <job_id> <level>"
429  *
430  * Scheduler should change the priority of a job. This will change the
431  * systems priority of the relevant job and change the priority of the job
432  * in the database to match. Both arguments are required for this command.
433  */
434  else if(strcmp(cmd, "priority") == 0)
435  {
436  arg1 = g_match_info_fetch(regex_match, 3);
437  arg2 = g_match_info_fetch(regex_match, 5);
438 
439  if(arg1 != NULL && arg2 != NULL)
440  {
441  i = atoi(arg1);
442 
443  params = g_new0(arg_int, 1);
444  params->first = g_tree_lookup(scheduler->job_list, &i);
445  params->second = atoi(arg2);
446  event_signal(job_priority_event, params);
447  g_free(arg1);
448  g_free(arg2);
449  }
450  else
451  {
452  if(arg1) g_free(arg1);
453  if(arg2) g_free(arg2);
454 
455  arg1 = g_strdup(buffer);
456  WARNING("Invalid priority command: %s\n", buffer);
457  snprintf(buffer, sizeof(buffer) - 1,
458  "ERROR: Invalid priority command: %s\n", arg1);
459  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
460  g_free(arg1);
461  }
462  }
463 
464  /* command: "database"
465  *
466  * The scheduler should check the database. This will normaly be sent by
467  * the ui when a new job has been queue and must be run.
468  */
469  else if(strcmp(cmd, "database") == 0)
470  {
471  event_signal(database_update_event, NULL);
472  }
473 
474  /* command: unknown
475  *
476  * The command sent does not match any of the known commands, log an error
477  * and inform the interface that this wasn't a command.
478  */
479  else
480  {
481  g_output_stream_write(conn->ostr, "Invalid command: \"", 18, NULL, NULL);
482  g_output_stream_write(conn->ostr, buffer, strlen(buffer), NULL, NULL);
483  g_output_stream_write(conn->ostr, "\"\n", 2, NULL, NULL);
484  con_printf(main_log, "ERROR %s.%d: Interface received invalid command: %s\n", __FILE__, __LINE__, cmd);
485  }
486 
487  g_match_info_free(regex_match);
488  g_free(cmd);
489  memset(buffer, '\0', sizeof(buffer));
490  }
491 
493  return;
494 }
495 
506 {
507  GSocketListener* server_socket;
508  GSocketConnection* new_connection;
509  GError* error = NULL;
510 
511  /* validate new thread */
512  if(scheduler->i_terminate || !scheduler->i_created)
513  {
514  ERROR("Could not create server socket thread\n");
515  return (void*)0;
516  }
517 
518  /* create the server socket to listen for connections on */
519  server_socket = g_socket_listener_new();
520  if(server_socket == NULL)
521  FATAL("could not create the server socket");
522 
523  g_socket_listener_add_inet_port(server_socket, scheduler->i_port, NULL, &error);
524  if(error)
525  FATAL("[port:%d]: %s", scheduler->i_port, error->message);
526  scheduler->cancel = g_cancellable_new();
527 
528  V_INTERFACE("INTERFACE: listening port is %d\n", scheduler->i_port);
529 
530  /* wait for new connections */
531  for(;;)
532  {
533  new_connection = g_socket_listener_accept(server_socket, NULL,
534  scheduler->cancel, &error);
535 
536  if(scheduler->i_terminate)
537  break;
538  V_INTERFACE("INTERFACE: new interface connection\n");
539  if(error)
540  FATAL("INTERFACE closing for %s", error->message);
541 
542  interface_conn_init(new_connection, scheduler->workers);
543  }
544 
545  V_INTERFACE("INTERFACE: socket listening thread closing\n");
546  g_socket_listener_close(server_socket);
547  g_object_unref(server_socket);
548  return (void*)1;
549 }
550 
551 /* ************************************************************************** */
552 /* **** Constructor Destructor ********************************************** */
553 /* ************************************************************************** */
554 
565 void interface_init(scheduler_t* scheduler)
566 {
567  if(!scheduler->i_created)
568  {
569  scheduler->i_created = 1;
570  scheduler->i_terminate = 0;
571 
572  scheduler->cancel = NULL;
573  scheduler->workers = g_thread_pool_new((GFunc)interface_thread,
574  scheduler, CONF_interface_nthreads, FALSE, NULL);
575 
576 #if GLIB_MAJOR_VERSION >= 2 && GLIB_MINOR_VERSION >= 32
577  scheduler->server = g_thread_new("interface",
578  (GThreadFunc)interface_listen_thread, scheduler);
579 #else
580  scheduler->server = g_thread_create((GThreadFunc)interface_listen_thread,
581  scheduler, TRUE, NULL);
582 #endif
583 
584  while(scheduler->cancel == NULL)
585  usleep(100);
586  }
587  else
588  {
589  WARNING("Multiple attempts made to initialize the interface");
590  }
591 }
592 
600 {
601  /* only destroy the interface if it has been created */
602  if(scheduler->i_created)
603  {
604  scheduler->i_terminate = 1;
605  scheduler->i_created = 0;
606 
607  g_cancellable_cancel(scheduler->cancel);
608  g_thread_join(scheduler->server);
609  g_thread_pool_free(scheduler->workers, FALSE, TRUE);
610 
611  scheduler->server = NULL;
612  scheduler->cancel = NULL;
613  scheduler->workers = NULL;
614  }
615  else
616  {
617  WARNING("Attempt to destroy the interface without initializing it");
618  }
619 }
void job_verbose_event(scheduler_t *scheduler, job_t *job)
Definition: job.c:255
GTree * host_list
List of all hosts available to the scheduler.
Definition: scheduler.h:171
void interface_thread(interface_connection *conn, scheduler_t *scheduler)
Function that will run the thread associated with a particular interface instance.
Definition: interface.c:137
void scheduler_close_event(scheduler_t *scheduler, void *killed)
Sets the closing flag and possibly kills all currently running agents.
Definition: scheduler.c:1025
void list_agents_event(scheduler_t *scheduler, GOutputStream *ostr)
Receive agent on interface.
Definition: agent.c:1129
#define ERROR(...)
Definition: logging.h:90
GCancellable * cancel
Used to stop the listening thread when it is running.
Definition: scheduler.h:180
void job_pause_event(scheduler_t *scheduler, arg_int *params)
Event to pause a job.
Definition: job.c:323
void interface_destroy(scheduler_t *scheduler)
Closes the server socket and thread pool that service UI connections.
Definition: interface.c:599
GRegex * parse_interface_cmd
Parses the commands received by the interface.
Definition: scheduler.h:199
static void interface_conn_destroy(interface_connection *inter)
Free the memory associated with an interface connection.
Definition: interface.c:103
gboolean i_created
Has the interface been created.
Definition: scheduler.h:175
void database_update_event(scheduler_t *scheduler, void *unused)
Checks the job queue for any new entries.
Definition: database.c:852
#define FATAL(...)
Definition: logging.h:74
uint16_t i_port
The port that the scheduler is listening on.
Definition: scheduler.h:177
GThread * server
Thread that is listening to the server socket.
Definition: scheduler.h:178
int32_t verbose
The verbose level for all of the agents in this job.
Definition: job.h:82
Log related operations.
GTree * job_list
List of jobs that have been created.
Definition: scheduler.h:183
#define BUFFER_SIZE
Maximum buffer length.
Definition: fossconfig.c:113
void job_restart_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:353
int verbose
The verbose flag for the cli.
Definition: fo_cli.c:49
log_t * main_log
Definition: logging.c:44
gboolean i_terminate
Has the interface been terminated.
Definition: scheduler.h:176
void database_exec_event(scheduler_t *scheduler, char *sql)
Definition: database.c:838
void print_host_load(GTree *host_list, GOutputStream *ostr)
Prints the host information to ostr.
Definition: host.c:186
GSocketConnection * conn
The socket that is our connection.
Definition: interface.c:66
char buffer[2048]
The last thing received from the scheduler.
Event handling operations.
GInputStream * istr
Stream to read from the interface.
Definition: interface.c:67
const char * jobsql_failed
static interface_connection * interface_conn_init(GSocketConnection *conn, GThreadPool *threads)
Definition: interface.c:82
Header file with agent related operations.
struct interface_connection interface_connection
gchar * message
Message that will be sent with job notification email.
Definition: job.h:80
void scheduler_config_event(scheduler_t *scheduler, void *unused)
Load both the fossology configuration and all the agent configurations.
Definition: scheduler.c:1002
The job structure.
Definition: job.h:61
void interface_init(scheduler_t *scheduler)
Create the interface thread and thread pool that handle UI connections.
Definition: interface.c:565
void * interface_listen_thread(scheduler_t *scheduler)
Function that will listen for new connections to the server sockets.
Definition: interface.c:505
Definition: event.h:56
void job_priority_event(scheduler_t *scheduler, arg_int *params)
Definition: job.c:396
void job_status_event(scheduler_t *scheduler, arg_int *params)
Event to get the status of the scheduler or a specific job.
Definition: job.c:277
GThreadPool * workers
Threads to handle incoming network communication.
Definition: scheduler.h:179
GOutputStream * ostr
Stream to write to the interface.
Definition: interface.c:68
Header file for the scheduler.
void job_fail_event(scheduler_t *scheduler, job_t *job)
Events that causes a job to be marked a failed.
Definition: job.c:417