-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpipe_processor.php
More file actions
123 lines (105 loc) · 3.87 KB
/
pipe_processor.php
File metadata and controls
123 lines (105 loc) · 3.87 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
<?php
require_once('twitter_config.php');
require_once('error_handler.php');
abstract class pipe_processor
{
var $_stopped = false;
var $_receive_option = NULL;
function pipe_processor()
{
global $log;
$this->start_time = microtime(true);
$this->change_dir();
//echo ini_get('max_execution_time');
ini_set('max_execution_time', 12000);
//echo ini_get('max_execution_time');
ignore_user_abort(true);
//impose a comfortable time limit
set_time_limit(0);
$this->init_memory = memory_get_usage();
$log->logInfo("Current memory usage: ".memory_get_usage());
}
function change_dir()
{
//echo 'Current Directory is: '.getcwd().chr(13).chr(10);
//echo 'Changing to: '.preg_replace('/\\/[^\\/]+$/',"",$_SERVER['PHP_SELF']).chr(13).chr(10);
@chdir(preg_replace('/\\/[^\\/]+$/',"",$_SERVER['PHP_SELF']));
//echo 'Current Directory is: '.getcwd().chr(13).chr(10);
}
function is_running($script, $max = 1)
{
$pids = array();
exec("ps aux | grep $script | grep -v grep | grep -v /tmp | grep -v stream_startd.php | awk '{print \$2}'", $pids);
return (count($pids) > $max);
}
function can_start()
{
global $log;
$this->file_name = get_class($this);
$log->logInfo('Script: '.$this->file_name);
if ($this->is_running($this->file_name, 1))
{
//echo 'Another process is already running';
exit;
}
}
function run($pipe_id)
{
global $log;
$this->can_start();
$queue = msg_get_queue($pipe_id);
// Receive options
$msgtype_receive = 0; // Whiche type of Message we want to receive ? (Here, the type is the same as the type we send,
// but if you set this to 0 you receive the next Message in the Queue with any type.
$maxsize = 1000000; // How long is the maximal data you like to receive.
$option_receive = $this->_receive_option; // If there are no messages of the wanted type in the Queue continue without wating.
// If is set to NULL wait for a Message.
$this->_stopped = false;
while(1 && $this->can_continue())
{
try
{
$log->logInfo('Listening to queue #'.$pipe_id);
if (msg_receive($queue, $msgtype_receive, $msgtype_erhalten, $maxsize, $data, TRUE, $option_receive, $err)===true)
{
$log->logInfo('Received a message...');
$this->process($data);
}
else
{
$this->post_message();
$message = ('Failed to receive message from queue: '.print_r($err, true));
$log->logInfo($message);
EmailFailReport($this->file_name, $message);
continue;
}
$queue_status = msg_stat_queue($queue);
$log->logInfo('Queue length: '.$queue_status['msg_qnum']);
}
catch(Exception $exp)
{
$message = FormatExceptionMessage($exp);
$log->logInfo($message);
}
}
EmailFailReport($this->file_name.' is terminating');
$log->logInfo($this->file_name.' is terminating...');
$log->logInfo("Memory consumed: ".memory_get_usage()-$this->init_memory);
$log->logInfo("Time taken: " .(microtime(true)-$this->start_time));
}
function can_continue()
{
return !$this->_stopped;
}
function stop()
{
$this->_stopped = true;;
}
function post_message()
{
//do nothing
//handle in child class and sleep if needed
}
abstract function process($data);
}
?>