36 private $maxJobsPerPage = 10;
38 private $nhours = 672;
43 $this->uploadDao = $uploadDao;
44 $this->logger =
new Logger(self::class);
58 $jobCount = count($upload_pks);
64 $offset = empty($page) ? 0 : $page * $this->maxJobsPerPage;
65 $totalPages = floor($jobCount / $this->maxJobsPerPage);
68 $lastOffset = ($jobCount < $this->maxJobsPerPage) ? $offset+$jobCount : $this->maxJobsPerPage;
69 $statementName = __METHOD__.
"upload_pkforjob";
70 $this->
dbManager->prepare($statementName,
"SELECT job_pk FROM job WHERE job_upload_fk=$1 ORDER BY job_pk ASC");
71 for (; $offset < $lastOffset; $offset++) {
72 $upload_pk = $upload_pks[$offset];
74 $result = $this->
dbManager->execute($statementName, array($upload_pk));
75 while ($row = $this->
dbManager->fetchArray($result)) {
76 $jobArray[] = $row[
'job_pk'];
80 return array($jobArray, $totalPages);
90 $statementName = __METHOD__.
"forjob_name";
93 "SELECT job_name FROM job WHERE job_upload_fk= $1 ORDER BY job_pk ASC",
97 return (empty($row[
'job_name']) ? $uploadId : $row[
'job_name']);
108 public function myJobs($allusers, $page = 0)
111 $offset = empty($page) ? 0 : ($page * $this->maxJobsPerPage) - 1;
113 $allusers_str = ($allusers == 0) ?
"job_user_fk='" .
Auth::getUserId() .
116 $statementName = __METHOD__ .
".countJobs." . $allusers_str;
117 $sql =
"SELECT count(*) AS cnt FROM job WHERE $allusers_str " .
118 "job_queued >= (now() - interval '" . $this->nhours .
" hours');";
120 $countJobs = $this->
dbManager->getSingleRow($sql, [], $statementName)[
'cnt'];
121 $totalPages = floor($countJobs / $this->maxJobsPerPage);
123 $statementName = __METHOD__ .
"." . $allusers_str;
124 $this->
dbManager->prepare($statementName,
125 "SELECT job_pk, job_upload_fk FROM job " .
"WHERE $allusers_str " .
126 "job_queued >= (now() - interval '" . $this->nhours .
" hours') " .
127 "ORDER BY job_queued DESC OFFSET $1 LIMIT " . $this->maxJobsPerPage);
128 $result = $this->
dbManager->execute($statementName, [$offset]);
129 while ($row = $this->
dbManager->fetchArray($result)) {
130 if (! empty($row[
'job_upload_fk'])) {
131 $uploadIsAccessible = $this->uploadDao->isAccessible(
133 if (! $uploadIsAccessible) {
137 $jobArray[] = $row[
'job_pk'];
141 return array($jobArray, $totalPages);
173 foreach ($job_pks as $job_pk) {
175 $statementName = __METHOD__ .
"JobRec";
176 $jobRec = $this->
dbManager->getSingleRow(
177 "SELECT * FROM job WHERE job_pk= $1", array($job_pk),
179 $jobData[$job_pk][
"job"] = $jobRec;
180 if (! empty($jobRec[
"job_upload_fk"])) {
181 $upload_pk = $jobRec[
"job_upload_fk"];
183 $statementName = __METHOD__ .
"UploadRec";
184 $uploadRec = $this->
dbManager->getSingleRow(
185 "SELECT * FROM upload WHERE upload_pk= $1", array($upload_pk),
187 if (! empty($uploadRec)) {
188 $jobData[$job_pk][
"upload"] = $uploadRec;
190 $uploadtree_tablename = $uploadRec[
"uploadtree_tablename"];
191 $statementName = __METHOD__ .
"uploadtreeRec";
192 $uploadtreeRec = $this->
dbManager->getSingleRow(
193 "SELECT * FROM $uploadtree_tablename where upload_fk = $1 and parent is null",
194 array($upload_pk), $statementName);
195 $jobData[$job_pk][
"uploadtree"] = $uploadtreeRec;
197 $statementName = __METHOD__ .
"uploadRecord";
198 $uploadRec = $this->
dbManager->getSingleRow(
199 "SELECT * FROM upload right join job on upload_pk = job_upload_fk where job_upload_fk = $1",
200 array($upload_pk), $statementName);
205 $jobName = $this->
getJobName($uploadRec[
"job_upload_fk"]);
206 $uploadRec[
"upload_filename"] =
"Deleted Upload: " .
207 $uploadRec[
"job_upload_fk"] .
"(" . $jobName .
")";
208 $uploadRec[
"upload_pk"] = $uploadRec[
"job_upload_fk"];
209 $jobData[$job_pk][
"upload"] = $uploadRec;
213 $statementName = __METHOD__ .
"job_pkforjob";
214 $this->
dbManager->prepare($statementName,
215 "SELECT jq.*,jd.jdep_jq_depends_fk FROM jobqueue jq LEFT OUTER JOIN jobdepends jd ON jq.jq_pk=jd.jdep_jq_fk WHERE jq.jq_job_fk=$1 ORDER BY jq_pk ASC");
216 $result = $this->
dbManager->execute($statementName, array(
219 $rows = $this->
dbManager->fetchAll($result);
220 if (! empty($rows)) {
221 foreach ($rows as $jobQueueRec) {
222 $jq_pk = $jobQueueRec[
"jq_pk"];
223 if (array_key_exists($job_pk, $jobData) &&
224 array_key_exists(
'jobqueue', $jobData[$job_pk]) &&
225 array_key_exists($jq_pk, $jobData[$job_pk][
'jobqueue'])) {
226 $jobData[$job_pk][
'jobqueue'][$jq_pk][
"depends"][] = $jobQueueRec[
"jdep_jq_depends_fk"];
228 $jobQueueRec[
"depends"] = array($jobQueueRec[
"jdep_jq_depends_fk"]);
229 $jobData[$job_pk][
'jobqueue'][$jq_pk] = $jobQueueRec;
233 unset($jobData[$job_pk]);
248 $filesPerSec = ($numSecs > 0) ? $itemsprocessed/$numSecs : 0;
260 public function getEstimatedTime($job_pk, $jq_Type=
'', $filesPerSec=0, $uploadId=0, $timeInSec=0)
262 if (!empty($uploadId)) {
263 $itemCount = $this->
dbManager->getSingleRow(
264 "SELECT jq_itemsprocessed FROM jobqueue INNER JOIN job ON jq_job_fk=job_pk " 265 .
" WHERE jq_type LIKE 'ununpack' AND jq_end_bits ='1' AND job_upload_fk=$1",
267 __METHOD__.
'.ununpack_might_be_in_other_job' 270 $itemCount = $this->
dbManager->getSingleRow(
271 "SELECT jq_itemsprocessed FROM jobqueue WHERE jq_type LIKE 'ununpack' AND jq_end_bits ='1' AND jq_job_fk =$1",
273 __METHOD__.
'.ununpack_must_be_in_this_job' 277 if (!empty($itemCount[
'jq_itemsprocessed']) && $jq_Type !==
'decider') {
279 $selectCol =
"jq_type, jq_endtime, jq_starttime, jq_itemsprocessed";
280 if (empty($jq_Type)) {
281 $removeType =
"jq_type NOT LIKE 'ununpack' AND jq_type NOT LIKE 'reportgen' AND jq_type NOT LIKE 'decider' AND jq_type NOT LIKE 'softwareHeritage' AND";
283 $statementName = __METHOD__.
"$selectCol.$removeType";
284 $this->
dbManager->prepare($statementName,
285 "SELECT $selectCol FROM jobqueue WHERE $removeType jq_job_fk =$1 ORDER BY jq_type DESC");
286 $result = $this->
dbManager->execute($statementName, array($job_pk));
288 $statementName = __METHOD__.
"$selectCol.$jq_Type";
289 $this->
dbManager->prepare($statementName,
290 "SELECT $selectCol FROM jobqueue WHERE jq_type LIKE '$jq_Type' AND jq_job_fk =$1");
291 $result = $this->
dbManager->execute($statementName, array($job_pk));
293 $estimatedArray = array();
295 while ($row = $this->
dbManager->fetchArray($result)) {
296 $timeOfCompletion = 0;
297 if (empty($row[
'jq_endtime']) && !empty($row[
'jq_starttime'])) {
298 if (empty($filesPerSec)) {
299 $burnTime = time() - strtotime($row[
'jq_starttime']);
303 if (!empty($filesPerSec)) {
304 $timeOfCompletion = ($itemCount[
'jq_itemsprocessed'] - $row[
'jq_itemsprocessed']) / $filesPerSec;
306 array_push($estimatedArray, $timeOfCompletion);
309 if (empty($estimatedArray)) {
312 $estimatedTime = round(
max($estimatedArray));
313 if (!empty($timeInSec)) {
314 return intval(!empty($estimatedTime) ? $estimatedTime : 0);
316 return intval($estimatedTime/3600).gmdate(
":i:s", $estimatedTime);
328 $statementName = __METHOD__.
"getDataForASingleJob";
329 $this->
dbManager->prepare($statementName,
330 "SELECT *, jq_endtime-jq_starttime as elapsed FROM jobqueue LEFT JOIN job ON job.job_pk = jobqueue.jq_job_fk WHERE jobqueue.jq_pk =$1");
331 $result = $this->
dbManager->execute($statementName, array($jq_pk));
332 $row = $this->
dbManager->fetchArray($result);
343 $statementName = __METHOD__.
"forjq_pk";
345 "SELECT jq_end_bits FROM jobqueue WHERE jq_pk = $1",
349 if ($row[
'jq_end_bits'] == 1 || $row[
'jq_end_bits'] == 2) {
363 $statementName = __METHOD__.
"forjqTypeAndjobId";
365 "SELECT jq_itemsprocessed, job.job_upload_fk FROM jobqueue JOIN job ON jobqueue.jq_job_fk = job.job_pk WHERE jq_type = $1 AND jq_end_bits = 0 AND jq_job_fk IN (SELECT job_pk FROM job WHERE job_upload_fk = (SELECT job_upload_fk FROM job WHERE job_pk = $2 LIMIT 1)) LIMIT 1",
366 array($jqType, $jobId),
369 if (!empty($row[
'jq_itemsprocessed'])) {
370 return array($row[
'jq_itemsprocessed'], $row[
'job_upload_fk']);
382 $sql =
"SELECT jq_type AS job, jq_job_fk, job_upload_fk AS upload_fk, " .
383 "CASE WHEN (jq_endtext IS NULL AND jq_end_bits = 0) THEN 'pending' " .
384 "WHEN (jq_endtext = ANY('{Started,Restarted,Paused}')) THEN 'running' " .
385 "ELSE '' END AS status " .
386 "FROM jobqueue INNER JOIN job " .
387 "ON jq_job_fk = job_pk " .
388 "WHERE jq_endtime IS NULL;";
389 $statement = __METHOD__ .
".getAllUnFinishedJobs";
390 return $this->
dbManager->getRows($sql, [], $statement);
getDataForASingleJob($jq_pk)
Return total Job data with time elapsed.
static getUserId()
Get the current user's id.
getNumItemsPerSec($itemsprocessed, $numSecs)
Returns Number of files/items processed per sec.
getItemsProcessedForDecider($jqType, $jobId)
Return array.
getJobStatus($jqPk)
Return boolean.
uploads2Jobs($upload_pks, $page=0)
Find all the jobs for a given set of uploads.
getEstimatedTime($job_pk, $jq_Type='', $filesPerSec=0, $uploadId=0, $timeInSec=0)
Returns Estimated time using jobid.
myJobs($allusers, $page=0)
Find all of my jobs submitted within the last n hours.
getJobName($uploadId)
Return job name. Used for deleted jobs.
getJobInfo($job_pks)
Get job queue data from db.
FUNCTION int max(int permGroup, int permPublic)
Get the maximum group privilege.
fo_dbManager * dbManager
fo_dbManager object
static getGroupId()
Get the current user's group id.