McOuni
Thu 30th Dec '04, 5:20pm
I have a script, which have to connect to list of servers and insert some stats derived from them into the MYSQL database.
I'm using while+fsockopen() now, but my script works like a turtle waiting each
answer from each server.
But it will be more better to create for example 10 fsockopen connections using non-block mode at the same time, and process them in turn using socket_select() function.
But i do not know, how to create such alhoritm. Hellllppp Please!!!:(
Alan @ CIT
Mon 10th Jan '05, 1:28pm
You'd be better of using fork() to do this I suspect.
There's a good example in the PHP mannual on how to fork a function a number of times. What you could do, is just put your socket code into a function, then fork that function 10 times so they all run at the same time.
Take a look at the user comment from "Jazeps Basko" at http://uk.php.net/manual/en/function.pcntl-fork.php for an idea of how to do it.
Thanks,
Alan.
Nemon
Tue 11th Jan '05, 5:10am
I has the same problem on windows, heres a messy bit of code i made.
Basicly what im wanting todo here is connect to a hundred or so pages and parse them for IPs. Obviosly waiting on each connection is out of the question, so. What i did was use non blocking sockets (because it was a windows machine) and use socket_select on 64 at a time. 64 because windows server crashed php on any more. I think 64 is max the underlying native code supports on windows server.
Without doubt there are a million and one ways to improve and fix bellow code but it did just what i wanted.
<?php
error_reporting( E_ERROR ^ E_WARNING );
// Processing queue messages.
define( MSG_REMOVE, 1 ); // Remove a "network app" from processing pump.
define( MSG_DONE, 2 ); // Network app complete move to done queue.
define( MSG_KEEP, 3 ); // Network app still processing, keep in processing queue.
// Network object states.
define( STATE_DISCONNECTED, 1 );
define( STATE_CONNECTING, 2 );
define( STATE_CONNECTED, 3 );
define( STATE_READING, 4 );
define( STATE_WRITEING, 5 );
class CGetPage
{
var $host = "";
var $page = "";
var $socket = null;
var $page_url = "";
var $sent;
var $send_size = "";
var $send_buffer = ""; // Buffer we receive the
var $recv_size = "";
var $recv_buffer = "";
function __construct( $page_url )
{
$this->page_url = $page_url;
$matches = array();
preg_match( "/(http:\/\/)?([^\/]+)(.*)/i", $page_url, $matches );
$this->host = $matches[2];
$this->page = $matches[3];
// Create socket.
$this->socket = socket_create( AF_INET, SOCK_STREAM, SOL_TCP );
socket_set_nonblock( $this->socket );
$this->send_buffer = "GET " . $this->page . " HTTP/1.0\r\n" .
"Host: " . $this->host . "\r\n\r\n";
$this->send_size = strlen( $this->send_buffer );
$this->sent = 0;
$this->state = STATE_DISCONNECTED;
}
function __destruct()
{
socket_close( $this->socket );
}
function get_select( &$rfd, &$wfd, &$efd )
{
$efd[] = $this->socket;
switch( $this->state )
{
case STATE_READING:
$rfd[] = $this->socket;
break;
case STATE_DISCONNECTED:
case STATE_CONNECTING:
case STATE_CONNECTED:
case STATE_WRITING:
$wfd[] = $this->socket;
break;
}
}
function process_select( $readable, $writeable, $exception )
{
if( $exception )
{
echo "Exception on socket: " . intval($this->socket) . "<br>";
if( $this->state == STATE_CONNECTING )
echo "Failed to connect to: " . $this->host . "<br>";
return MSG_REMOVE;
}
switch( $this->state )
{
case STATE_DISCONNECTED:
echo "<b>Connecting to host:</b> " . $this->host . "<br>";
@socket_connect( $this->socket, $this->host, 80 );
$this->state = STATE_CONNECTING;
return MSG_KEEP;
break;
case STATE_CONNECTING:
if( !$writeable )
return MSG_KEEP;
echo "<b>Connected to host:</b> " . $this->host . "<br>";
$this->state = STATE_CONNECTED;
return MSG_KEEP;
break;
case STATE_CONNECTED:
if( !$writeable )
return MSG_KEEP;
echo "<b>Requesting page:</b> " . $this->host . $this->page . "<br>";
$sent = socket_write( $this->socket, $this->send_buffer, $this->send_size );
if( $sent <= 0 )
{
echo "Write error on socket: " . intval($this->socket) . "<br>Host: " . $this->host . "<br>";
return MSG_REMOVE;
}
if( $sent < $this->send_size )
die( "Send requires multi parts...<br>" );
$this->state = STATE_READING;
return MSG_KEEP;
break;
case STATE_READING:
if( !$readable )
return MSG_KEEP;
$buffer = "";
$recv = socket_recv( $this->socket, $buffer, 8000, 0 );
if( $recv <= 0 )
{
if( !(strlen($buffer) + strlen($this->recv_buffer)) )
echo "Received nothing!!!<br>";
echo "<b>Page received:</b> " . $this->host . $this->page . " <b>Size:</b> " . round(($this->recv_size/1024),2) . "KB<br>";
return MSG_DONE;
}
$this->recv_size += $recv;
$this->recv_buffer .= $buffer; // Concat buffers.
return MSG_KEEP;
break;
} // END SWITCH
} // END PROCESS_SELECT
}
$page_list = array();
$done_list = array();
$fail_list = array();
$page = new CGetPage( "http://www.somesite.com/page.html" );
$page_list[ intval($page->socket) ] = $page;
echo "Requesting " . count($page_list) . " pages.<br>";
ob_start();
foreach( $page_list as $page )
echo "<b>Page:</b> " . $page->host . $page->page . "<br>";
while( count($page_list) )
{
$select_counter = 0;
$select_read = $select_write = $select_exception = array();
foreach( $page_list as $page )
{
// Windows select limit causes access violation, dont select more than 64 sockets at a time.
if( $select_counter++ == 64 ) break;
$page->get_select( $select_read, $select_write, $select_exception );
// Initialize conneciton.
if( $page->state == STATE_DISCONNECTED )
$page->process_select( false, true, false );
}
socket_select( $select_read, $select_write, $select_exception, 0, 200 );
$sockets_selected = array_unique( array_merge($select_read,$select_write,$select_exc eption) );
foreach( $sockets_selected as $socket )
{
$page = $page_list[ intval($socket) ];
$selected_read = in_array( $socket, $select_read );
$selected_write = in_array( $socket, $select_write );
$selected_exception = in_array( $socket, $select_exception );
switch( $page->process_select($selected_read, $selected_write, $selected_exception) )
{
case MSG_REMOVE:
echo "Removing " . intval($page->socket) . "<br>";
$page_list = array_diff( $page_list, array($page) );
$fail_list[] = $page;
break;
case MSG_KEEP:
break;
case MSG_DONE:
$page_list = array_diff( $page_list, array($page) );
$done_list[] = $page;
break;
}
}
}
if( isset($_GET['debug']) )
ob_end_flush();
else
ob_end_clean();
echo "Requests complete.<br><br>";
echo "Parsing pages.<br>";
// Parse all the pages for IPs.
$dbcon = new mysqli( "REMOVED", "REMOVED", "REMOVED", "REMOVED" );
$ip_list = array();
$ip_done_list = array();
foreach( $done_list as $page )
{
if( preg_match_all( "/\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}/", $page->recv_buffer, $matches) )
{
$ip_list = array_unique( $matches[0] );
$ip_list = array_diff( $ip_list, $ip_done_list );
$ip_done_list = array_merge( $ip_done_list, $ip_list );
// This code SUCKS, I KNOW!
foreach( $ip_list as $ip )
{
$result = $dbcon->query( "SELECT id FROM proxys WHERE ip = '" . $ip . "'" );
if( $result === FALSE ) die( "Query error." );
if( $result->num_rows == 0 )
{
$dbcon->query( "INSERT INTO proxys (ip,note) VALUES('" . $ip . "','" . $page->page_url . "')" );
echo "<b>Inserted IP:</b> " . $ip . "<br>";
}
$result->close();
}
}
}
$dbcon->close();
ob_end_flush();
echo "Update Complete.<br>";
?>
vBulletin® v3.8.0 Release Candidate 1, Copyright ©2000-2008, Jelsoft Enterprises Ltd.