php pthreads多线程的安装与使用(3)

<?php $fp = fopen("/tmp/lock.txt", "r+"); if (flock($fp, LOCK_EX)) { // 进行排它型锁定 ftruncate($fp, 0); // truncate file fwrite($fp, "Write something here\n"); fflush($fp); // flush output before releasing the lock flock($fp, LOCK_UN); // 释放锁定 } else { echo "Couldn't get the lock!"; } fclose($fp); $fp = fopen('/tmp/lock.txt', 'r+'); if(!flock($fp, LOCK_EX | LOCK_NB)) { echo 'Unable to obtain lock'; exit(-1); } fclose($fp); ?>

八、多线程与数据连接

pthreads 与 pdo 同时使用是,需要注意一点,需要静态声明public static $dbh;并且通过单例模式访问数据库连接。

Worker 与 PDO

<?php class Work extends Stackable { public function __construct() { } public function run() { $dbh = $this->worker->getConnection(); $sql = "select id,name from members order by id desc limit "; $row = $dbh->query($sql); while($member = $row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } class ExampleWorker extends Worker { public static $dbh; public function __construct($name) { } /* * The run method should just prepare the environment for the work that is coming ... */ public function run(){ self::$dbh = new PDO('mysql:host=...;dbname=example','www',''); } public function getConnection(){ return self::$dbh; } } $worker = new ExampleWorker("My Worker Thread"); $work=new Work(); $worker->stack($work); $worker->start(); $worker->shutdown(); ?>

Pool 与 PDO

在线程池中链接数据库

# cat pool.php <?php class ExampleWorker extends Worker { public function __construct(Logging $logger) { $this->logger = $logger; } protected $logger; } /* the collectable class implements machinery for Pool::collect */ class Work extends Stackable { public function __construct($number) { $this->number = $number; } public function run() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true, PDO::ATTR_PERSISTENT => true ) ); $sql = "select OPEN_TIME, `COMMENT` from MT_TRADES where LOGIN='".$this->number['name']."' and CMD='' and `COMMENT` = '".$this->number['order'].":DEPOSIT'"; #echo $sql; $row = $dbh->query($sql); $mt_trades = $row->fetch(PDO::FETCH_ASSOC); if($mt_trades){ $row = null; $sql = "UPDATE db_example.accounts SET paystatus='成功', deposit_time='".$mt_trades['OPEN_TIME']."' where `order` = '".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh = null; printf("runtime: %s, %s, %s\n", date('Y-m-d H:i:s'), $this->worker->getThreadId() ,$this->number['order']); } } class Logging extends Stackable { protected static $dbh; public function __construct() { $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'example_real'; // 数据库名 self::$dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); } protected function log($message, $args = []) { $args = func_get_args(); if (($message = array_shift($args))) { echo vsprintf("{$message}\n", $args); } } protected function getConnection(){ return self::$dbh; } } $pool = new Pool(, \ExampleWorker::class, [new Logging()]); $dbhost = 'db.example.com'; // 数据库服务器 $dbuser = 'example.com'; // 数据库用户名 $dbpw = 'password'; // 数据库密码 $dbname = 'db_example'; $dbh = new PDO("mysql:host=$dbhost;port=;dbname=$dbname", $dbuser, $dbpw, array( PDO::MYSQL_ATTR_INIT_COMMAND => 'SET NAMES \'UTF\'', PDO::MYSQL_ATTR_COMPRESS => true ) ); $sql = "select `order`,name from accounts where deposit_time is null order by id desc"; $row = $dbh->query($sql); while($account = $row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(new Work($account)); } $pool->shutdown(); ?> 

进一步改进上面程序,我们使用单例模式 $this->worker->getInstance(); 全局仅仅做一次数据库连接,线程使用共享的数据库连接

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:http://www.heiqu.com/c62774f2a100c95aca10a5b5f62777c6.html