php pthreads多线程的安装与使用
安装Pthreads基本上需要重新编译PHP,加上--enable-maintainer-zts参数,但是用这个文档很少;bug会很多很有很多意想不到的问题,生成环境上只能呵呵了,所以这个东西玩玩就算了,真正多线程还是用Python、C等等
一、安装
这里使用的是php-7.0.2
./configure\ --prefix=/usr/local/php7\ --with-config-file-path=/etc\ --with-config-file-scan-dir=/etc/php.d\ --enable-debug\ --enable-maintainer-zts\ --enable-pcntl\ --enable-fpm\ --enable-opcache\ --enable-embed=shared\ --enable-json=shared\ --enable-phpdbg\ --with-curl=shared\ --with-mysql=/usr/local/mysql\ --with-mysqli=/usr/local/mysql/bin/mysql_config\ --with-pdo-mysql
make&&makeinstall
安装pthreads
peclinstallpthreads
二、Thread
<?php #1 $thread=newclassextendsThread{ publicfunctionrun(){ echo"HelloWorld{$this->getThreadId()}\n"; } }; $thread->start()&&$thread->join(); #2 classworkerThreadextendsThread{ publicfunction__construct($i){ $this->i=$i; } publicfunctionrun(){ while(true){ echo$this->i."\n"; sleep(1); } } } for($i=0;$i<50;$i++){ $workers[$i]=newworkerThread($i); $workers[$i]->start(); } ?>
三、Worker与Stackable
StackablesaretasksthatareexecutedbyWorkerthreads.Youcansynchronizewith,read,andwriteStackableobjectsbefore,afterandduringtheirexecution.
<?php classSQLQueryextendsStackable{ publicfunction__construct($sql){ $this->sql=$sql; } publicfunctionrun(){ $dbh=$this->worker->getConnection(); $row=$dbh->query($this->sql); while($member=$row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } classExampleWorkerextendsWorker{ publicstatic$dbh; publicfunction__construct($name){ } publicfunctionrun(){ self::$dbh=newPDO('mysql:host=10.0.0.30;dbname=testdb','root','123456'); } publicfunctiongetConnection(){ returnself::$dbh; } } $worker=newExampleWorker("MyWorkerThread"); $sql1=newSQLQuery('select*fromtestorderbyiddesclimit1,5'); $worker->stack($sql1); $sql2=newSQLQuery('select*fromtestorderbyiddesclimit5,5'); $worker->stack($sql2); $worker->start(); $worker->shutdown(); ?>
四、互斥锁
什么情况下会用到互斥锁?在你需要控制多个线程同一时刻只能有一个线程工作的情况下可以使用。一个简单的计数器程序,说明有无互斥锁情况下的不同
<?php $counter=0; $handle=fopen("/tmp/counter.txt","w"); fwrite($handle,$counter); fclose($handle); classCounterThreadextendsThread{ publicfunction__construct($mutex=null){ $this->mutex=$mutex; $this->handle=fopen("/tmp/counter.txt","w+"); } publicfunction__destruct(){ fclose($this->handle); } publicfunctionrun(){ if($this->mutex) $locked=Mutex::lock($this->mutex); $counter=intval(fgets($this->handle)); $counter++; rewind($this->handle); fputs($this->handle,$counter); printf("Thread#%lusays:%s\n",$this->getThreadId(),$counter); if($this->mutex) Mutex::unlock($this->mutex); } } //没有互斥锁 for($i=0;$i<50;$i++){ $threads[$i]=newCounterThread(); $threads[$i]->start(); } //加入互斥锁 $mutex=Mutex::create(true); for($i=0;$i<50;$i++){ $threads[$i]=newCounterThread($mutex); $threads[$i]->start(); } Mutex::unlock($mutex); for($i=0;$i<50;$i++){ $threads[$i]->join(); } Mutex::destroy($mutex); ?>
多线程与共享内存
在共享内存的例子中,没有使用任何锁,仍然可能正常工作,可能工作内存操作本身具备锁的功能
<?php $tmp=tempnam(__FILE__,'PHP'); $key=ftok($tmp,'a'); $shmid=shm_attach($key); $counter=0; shm_put_var($shmid,1,$counter); classCounterThreadextendsThread{ publicfunction__construct($shmid){ $this->shmid=$shmid; } publicfunctionrun(){ $counter=shm_get_var($this->shmid,1); $counter++; shm_put_var($this->shmid,1,$counter); printf("Thread#%lusays:%s\n",$this->getThreadId(),$counter); } } for($i=0;$i<100;$i++){ $threads[]=newCounterThread($shmid); } for($i=0;$i<100;$i++){ $threads[$i]->start(); } for($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove($shmid); shm_detach($shmid); ?>
五、线程同步
有些场景我们不希望thread->start()就开始运行程序,而是希望线程等待我们的命令。thread−>wait();测作用是thread−>start()后线程并不会立即运行,只有收到thread->notify();发出的信号后才运行
<?php $tmp=tempnam(__FILE__,'PHP'); $key=ftok($tmp,'a'); $shmid=shm_attach($key); $counter=0; shm_put_var($shmid,1,$counter); classCounterThreadextendsThread{ publicfunction__construct($shmid){ $this->shmid=$shmid; } publicfunctionrun(){ $this->synchronized(function($thread){ $thread->wait(); },$this); $counter=shm_get_var($this->shmid,1); $counter++; shm_put_var($this->shmid,1,$counter); printf("Thread#%lusays:%s\n",$this->getThreadId(),$counter); } } for($i=0;$i<100;$i++){ $threads[]=newCounterThread($shmid); } for($i=0;$i<100;$i++){ $threads[$i]->start(); } for($i=0;$i<100;$i++){ $threads[$i]->synchronized(function($thread){ $thread->notify(); },$threads[$i]); } for($i=0;$i<100;$i++){ $threads[$i]->join(); } shm_remove($shmid); shm_detach($shmid); ?>
六、线程池
一个Pool类
<?php classUpdateextendsThread{ public$running=false; public$row=array(); publicfunction__construct($row){ $this->row=$row; $this->sql=null; } publicfunctionrun(){ if(strlen($this->row['bankno'])>100){ $bankno=safenet_decrypt($this->row['bankno']); }else{ $error=sprintf("%s,%s\r\n",$this->row['id'],$this->row['bankno']); file_put_contents("bankno_error.log",$error,FILE_APPEND); } if(strlen($bankno)>7){ $sql=sprintf("updatememberssetbankno='%s'whereid='%s';",$bankno,$this->row['id']); $this->sql=$sql; } printf("%s\n",$this->sql); } } classPool{ public$pool=array(); publicfunction__construct($count){ $this->count=$count; } publicfunctionpush($row){ if(count($this->pool)<$this->count){ $this->pool[]=newUpdate($row); returntrue; }else{ returnfalse; } } publicfunctionstart(){ foreach($this->poolas$id=>$worker){ $this->pool[$id]->start(); } } publicfunctionjoin(){ foreach($this->poolas$id=>$worker){ $this->pool[$id]->join(); } } publicfunctionclean(){ foreach($this->poolas$id=>$worker){ if(!$worker->isRunning()){ unset($this->pool[$id]); } } } } try{ $dbh=newPDO("mysql:host=".str_replace(':',';port=',$dbhost).";dbname=$dbname",$dbuser,$dbpw,array( PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF8\'', PDO::MYSQL_ATTR_COMPRESS=>true ) ); $sql="selectid,banknofrommembersorderbyiddesc"; $row=$dbh->query($sql); $pool=newPool(5); while($member=$row->fetch(PDO::FETCH_ASSOC)) { while(true){ if($pool->push($member)){//压入任务到池中 break; }else{//如果池已经满,就开始启动线程 $pool->start(); $pool->join(); $pool->clean(); } } } $pool->start(); $pool->join(); $dbh=null; }catch(Exception$e){ echo'[',date('H:i:s'),']','系统错误',$e->getMessage(),"\n"; } ?>
动态队列线程池
上面的例子是当线程池满后执行start统一启动,下面的例子是只要线程池中有空闲便立即创建新线程。
<?php classUpdateextendsThread{ public$running=false; public$row=array(); publicfunction__construct($row){ $this->row=$row; $this->sql=null; //print_r($this->row); } publicfunctionrun(){ if(strlen($this->row['bankno'])>100){ $bankno=safenet_decrypt($this->row['bankno']); }else{ $error=sprintf("%s,%s\r\n",$this->row['id'],$this->row['bankno']); file_put_contents("bankno_error.log",$error,FILE_APPEND); } if(strlen($bankno)>7){ $sql=sprintf("updatememberssetbankno='%s'whereid='%s';",$bankno,$this->row['id']); $this->sql=$sql; } printf("%s\n",$this->sql); } } try{ $dbh=newPDO("mysql:host=".str_replace(':',';port=',$dbhost).";dbname=$dbname",$dbuser,$dbpw,array( PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF8\'', PDO::MYSQL_ATTR_COMPRESS=>true ) ); $sql="selectid,banknofrommembersorderbyiddesclimit50"; $row=$dbh->query($sql); $pool=array(); while($member=$row->fetch(PDO::FETCH_ASSOC)) { $id=$member['id']; while(true){ if(count($pool)<5){ $pool[$id]=newUpdate($member); $pool[$id]->start(); break; }else{ foreach($poolas$name=>$worker){ if(!$worker->isRunning()){ unset($pool[$name]); } } } } } $dbh=null; }catch(Exception$e){ echo'【',date('H:i:s'),'】','【系统错误】',$e->getMessage(),"\n"; } ?>
pthreadsPool类
<?php classWebWorkerextendsWorker{ publicfunction__construct(SafeLog$logger){ $this->logger=$logger; } protected$loger; } classWebWorkextendsStackable{ publicfunctionisComplete(){ return$this->complete; } publicfunctionrun(){ $this->worker ->logger ->log("%sexecutinginThread#%lu", __CLASS__,$this->worker->getThreadId()); $this->complete=true; } protected$complete; } classSafeLogextendsStackable{ protectedfunctionlog($message,$args=[]){ $args=func_get_args(); if(($message=array_shift($args))){ echovsprintf( "{$message}\n",$args); } } } $pool=newPool(8,\WebWorker::class,[newSafeLog()]); $pool->submit($w=newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->submit(newWebWork()); $pool->shutdown(); $pool->collect(function($work){ return$work->isComplete(); }); var_dump($pool);
七、多线程文件安全读写
LOCK_SH取得共享锁定(读取的程序)
LOCK_EX取得独占锁定(写入的程序
LOCK_UN释放锁定(无论共享或独占)
LOCK_NB如果不希望flock()在锁定时堵塞
<?php $fp=fopen("/tmp/lock.txt","r+"); if(flock($fp,LOCK_EX)){//进行排它型锁定 ftruncate($fp,0);//truncatefile fwrite($fp,"Writesomethinghere\n"); fflush($fp);//flushoutputbeforereleasingthelock flock($fp,LOCK_UN);//释放锁定 }else{ echo"Couldn'tgetthelock!"; } fclose($fp); $fp=fopen('/tmp/lock.txt','r+'); if(!flock($fp,LOCK_EX|LOCK_NB)){ echo'Unabletoobtainlock'; exit(-1); } fclose($fp); ?>
八、多线程与数据连接
pthreads与pdo同时使用是,需要注意一点,需要静态声明publicstatic$dbh;并且通过单例模式访问数据库连接。
Worker与PDO
<?php classWorkextendsStackable{ publicfunction__construct(){ } publicfunctionrun(){ $dbh=$this->worker->getConnection(); $sql="selectid,namefrommembersorderbyiddesclimit"; $row=$dbh->query($sql); while($member=$row->fetch(PDO::FETCH_ASSOC)){ print_r($member); } } } classExampleWorkerextendsWorker{ publicstatic$dbh; publicfunction__construct($name){ } /* *Therunmethodshouldjustpreparetheenvironmentfortheworkthatiscoming... */ publicfunctionrun(){ self::$dbh=newPDO('mysql:host=...;dbname=example','www',''); } publicfunctiongetConnection(){ returnself::$dbh; } } $worker=newExampleWorker("MyWorkerThread"); $work=newWork(); $worker->stack($work); $worker->start(); $worker->shutdown(); ?>
Pool与PDO
在线程池中链接数据库
#catpool.php <?php classExampleWorkerextendsWorker{ publicfunction__construct(Logging$logger){ $this->logger=$logger; } protected$logger; } /*thecollectableclassimplementsmachineryforPool::collect*/ classWorkextendsStackable{ publicfunction__construct($number){ $this->number=$number; } publicfunctionrun(){ $dbhost='db.example.com';//数据库服务器 $dbuser='example.com';//数据库用户名 $dbpw='password';//数据库密码 $dbname='example_real'; $dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array( PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'', PDO::MYSQL_ATTR_COMPRESS=>true, PDO::ATTR_PERSISTENT=>true ) ); $sql="selectOPEN_TIME,`COMMENT`fromMT_TRADESwhereLOGIN='".$this->number['name']."'andCMD=''and`COMMENT`='".$this->number['order'].":DEPOSIT'"; #echo$sql; $row=$dbh->query($sql); $mt_trades=$row->fetch(PDO::FETCH_ASSOC); if($mt_trades){ $row=null; $sql="UPDATEdb_example.accountsSETpaystatus='成功',deposit_time='".$mt_trades['OPEN_TIME']."'where`order`='".$this->number['order']."';"; $dbh->query($sql); #printf("%s\n",$sql); } $dbh=null; printf("runtime:%s,%s,%s\n",date('Y-m-dH:i:s'),$this->worker->getThreadId(),$this->number['order']); } } classLoggingextendsStackable{ protectedstatic$dbh; publicfunction__construct(){ $dbhost='db.example.com';//数据库服务器 $dbuser='example.com';//数据库用户名 $dbpw='password';//数据库密码 $dbname='example_real';//数据库名 self::$dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array( PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'', PDO::MYSQL_ATTR_COMPRESS=>true ) ); } protectedfunctionlog($message,$args=[]){ $args=func_get_args(); if(($message=array_shift($args))){ echovsprintf("{$message}\n",$args); } } protectedfunctiongetConnection(){ returnself::$dbh; } } $pool=newPool(,\ExampleWorker::class,[newLogging()]); $dbhost='db.example.com';//数据库服务器 $dbuser='example.com';//数据库用户名 $dbpw='password';//数据库密码 $dbname='db_example'; $dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array( PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'', PDO::MYSQL_ATTR_COMPRESS=>true ) ); $sql="select`order`,namefromaccountswheredeposit_timeisnullorderbyiddesc"; $row=$dbh->query($sql); while($account=$row->fetch(PDO::FETCH_ASSOC)) { $pool->submit(newWork($account)); } $pool->shutdown(); ?>
进一步改进上面程序,我们使用单例模式$this->worker->getInstance();全局仅仅做一次数据库连接,线程使用共享的数据库连接
<?php classExampleWorkerextendsWorker{ #publicfunction__construct(Logging$logger){ #$this->logger=$logger; #} #protected$logger; protectedstatic$dbh; publicfunction__construct(){ } publicfunctionrun(){ $dbhost='db.example.com';//数据库服务器 $dbuser='example.com';//数据库用户名 $dbpw='password';//数据库密码 $dbname='example';//数据库名 self::$dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array( PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'', PDO::MYSQL_ATTR_COMPRESS=>true, PDO::ATTR_PERSISTENT=>true ) ); } protectedfunctiongetInstance(){ returnself::$dbh; } } /*thecollectableclassimplementsmachineryforPool::collect*/ classWorkextendsStackable{ publicfunction__construct($data){ $this->data=$data; #print_r($data); } publicfunctionrun(){ #$this->worker->logger->log("%sexecutinginThread#%lu",__CLASS__,$this->worker->getThreadId()); try{ $dbh=$this->worker->getInstance(); #print_r($dbh); $id=$this->data['id']; $mobile=safenet_decrypt($this->data['mobile']); #printf("%d,%s\n",$id,$mobile); if(strlen($mobile)>){ $mobile=substr($mobile,-); } if($mobile=='null'){ #$sql="UPDATEmembers_digestSETmobile='".$mobile."'whereid='".$id."'"; #printf("%s\n",$sql); #$dbh->query($sql); $mobile=''; $sql="UPDATEmembers_digestSETmobile=:mobilewhereid=:id"; }else{ $sql="UPDATEmembers_digestSETmobile=md(:mobile)whereid=:id"; } $sth=$dbh->prepare($sql); $sth->bindValue(':mobile',$mobile); $sth->bindValue(':id',$id); $sth->execute(); #echo$sth->debugDumpParams(); } catch(PDOException$e){ $error=sprintf("%s,%s\n",$mobile,$id); file_put_contents("mobile_error.log",$error,FILE_APPEND); } #$dbh=null; printf("runtime:%s,%s,%s,%s\n",date('Y-m-dH:i:s'),$this->worker->getThreadId(),$mobile,$id); #printf("runtime:%s,%s\n",date('Y-m-dH:i:s'),$this->number); } } $pool=newPool(,\ExampleWorker::class,[]); #foreach(range(,)as$number){ #$pool->submit(newWork($number)); #} $dbhost='db.example.com';//数据库服务器 $dbuser='example.com';//数据库用户名 $dbpw='password';//数据库密码 $dbname='example'; $dbh=newPDO("mysql:host=$dbhost;port=;dbname=$dbname",$dbuser,$dbpw,array( PDO::MYSQL_ATTR_INIT_COMMAND=>'SETNAMES\'UTF\'', PDO::MYSQL_ATTR_COMPRESS=>true ) ); #print_r($dbh); #$sql="selectid,mobilefrommemberswhereid<:id"; #$sth=$dbh->prepare($sql); #$sth->bindValue(':id',); #$sth->execute(); #$result=$sth->fetchAll(); #print_r($result); # #$sql="UPDATEmembers_digestSETmobile=:mobilewhereid=:id"; #$sth=$dbh->prepare($sql); #$sth->bindValue(':mobile','aa'); #$sth->bindValue(':id',''); #echo$sth->execute(); #echo$sth->queryString; #echo$sth->debugDumpParams(); $sql="selectid,mobilefrommembersorderbyidasc";//limit"; $row=$dbh->query($sql); while($members=$row->fetch(PDO::FETCH_ASSOC)) { #$order=$account['order']; #printf("%s\n",$order); //print_r($members); $pool->submit(newWork($members)); #unset($account['order']); } $pool->shutdown(); ?>
多线程中操作数据库总结
总的来说pthreads仍然处在发展中,仍有一些不足的地方,我们也可以看到pthreads的git在不断改进这个项目
数据库持久链接很重要,否则每个线程都会开启一次数据库连接,然后关闭,会导致很多链接超时。
<?php $dbh=newPDO('mysql:host=localhost;dbname=test',$user,$pass,array( PDO::ATTR_PERSISTENT=>true )); ?>
关于phppthreads多线程的安装与使用的相关知识,就先给大家介绍到这里,后续还会持续更新。