apache zookeeper使用方法实例详解
本文涉及了ApacheZookeeper使用方法实例详解的相关知识,接下来我们就看看具体内容。
简介
ApacheZookeeper是由ApacheHadoop的Zookeeper子项目发展而来,现在已经成为了Apache的顶级项目。Zookeeper为分布式系统提供了高效可靠且易于使用的协同服务,它可以为分布式应用提供相当多的服务,诸如统一命名服务,配置管理,状态同步和组服务等。Zookeeper接口简单,开发人员不必过多地纠结在分布式系统编程难于处理的同步和一致性问题上,你可以使用Zookeeper提供的现成(off-the-shelf)服务来实现分布式系统的配置管理,组管理,Leader选举等功能。
英文原文地址:http://zookeeper.apache.org/doc/current/javaExample.html
一个简单的ZookeeperWatch客户端
为了介绍ZookeeperJavaAPI的基本用法,本文将带你如何一步一步实现一个功能简单的 Zookeeper客户端。该Zookeeper客户端会监视一个你指定Zookeeper节点Znode,当被监视的节点发生变化时,客户端会启动或者停止某一程序。
基本要求
该客户端具备四个基本要求:
(1)客户端所带参数:
(2)Zookeeper服务地址。
(3)被监视的Znode节点名称。
(4)可执行程序及其所带的参数
客户端会获取被监视Znode节点的数据并启动你所指定的可执行程序。如果被监视的Znode节点发生改变,客户端重新获取其内容并再次启动你所指定的可执行程序。如果被监视的Znode节点消失,客户端会杀死可执行程序。
程序设计
一般而言,Zookeeper应用程序分为两部分,其中一部分维护与服务器端的连接,另外一部分监视Znode节点的数据。在本程序中,Executor类负责维护Zookeeper连接,DataMonitor类监视Zookeeper目录树中的数据,同时,Executor包含了主线程和程序主要的执行逻辑,它负责少量的用户交互,以及与可执行程序的交互,该可执行程序接受你向它传入的参数,并且会根据被监视的Znode节点的状态变化停止或重启。
Executor类
Executor对象是本例程最基本的“容器”,它包括Zookeeper对象和DataMonitor对象。
publicstaticvoidmain(String[]args){ if(args.length<4){ System.err .println("USAGE:ExecutorhostPortznodefilenameprogram[args...]"); System.exit(2); } StringhostPort=args[0]; Stringznode=args[1]; Stringfilename=args[2]; Stringexec[]=newString[args.length-3]; System.arraycopy(args,3,exec,0,exec.length); try{ newExecutor(hostPort,znode,filename,exec).run(); }catch(Exceptione){ e.printStackTrace(); } } publicExecutor(StringhostPort,Stringznode,Stringfilename, Stringexec[])throwsKeeperException,IOException{ this.filename=filename; this.exec=exec; zk=newZooKeeper(hostPort,3000,this); dm=newDataMonitor(zk,znode,null,this); } publicvoidrun(){ try{ synchronized(this){ while(!dm.dead){ wait(); } } }catch(InterruptedExceptione){ } }
回忆一下Executor的任务是根据Zookeeper中Znode节点状态改变所触发的事件来启动和停止你在命令行指定的可执行程序,在上面的代码你可以看到,Executor类在其构造函数中实例化Zookeeper对象时,将其自身的引用作为Watch参数传递给Zookeeper的构造函数,同时它也将其自身的引用作为DataMonitorListener参数传递给DataMonitor的构造函数。Executor本身实现了以下接口:
publicclassExecutorimplementsWatcher,Runnable,DataMonitor.DataMonitorListener{ ...
Watcher接口是在ZooKeeperJavaAPI中定义的。ZooKeeper用它来与“容器”(此处“容器”与上面的Executor类相似)进行通信,Watcher只支持一个方法,即process(),ZooKeeper用该函数来处理主线程可能感兴趣的事件,例如Zookeeper连接或会话的状态,本例中的“容器”Executor只是简单地把事件向下传递给DataMonitor,具体如何处理事件是由DataMonitor决定的。本文只是简单地描述了如何使用Watcher,通常情况下,Executor或与Executor类似的对象拥有与Zookeeper服务端的连接,但它可以将事件传递给其他对象,并有其它的对象处理该事件。
publicvoidprocess(WatchedEventevent){ dm.process(event); }
DataMonitorListener接口本身不是ZookeeperAPI的一部分,它完全是一个自定义的接口,可以说是专门为本程序设计的。DataMonitor对象使用该接口和“容器”(即Executor类)进行通信,DataMonitorListener接口如下:
publicinterfaceDataMonitorListener{ /** *Theexistencestatusofthenodehaschanged. */ voidexists(bytedata[]); /** *TheZooKeepersessionisnolongervalid. * *@paramrc *theZooKeeperreasoncode */ voidclosing(intrc); }
该接口在DataMonitor中定义,Executor类实现该接口,当Executor.exists()被调用的时候,Executor决定是否启动或停止事先指定的应用程序(回忆一下前文所说的,当Znode消失时Zookeeper客户端会杀死该可执行程序)。
当Executor.closing()被调用的时候,Executor会根据Zookeeper连接永久性地消失来决定是否关闭自己。
你或许已经猜到,DataMonitor对象根据Zookeeper状态变化来调用这些方法吧?
以下是Executor类中实现DataMonitorListener.exists()和DataMonitorListener.closing()的代码:
publicvoidexists(byte[]data){ if(data==null){ if(child!=null){ System.out.println("Killingprocess"); child.destroy(); try{ child.waitFor(); }catch(InterruptedExceptione){ } } child=null; }else{ if(child!=null){ System.out.println("Stoppingchild"); child.destroy(); try{ child.waitFor(); }catch(InterruptedExceptione){ e.printStackTrace(); } } try{ FileOutputStreamfos=newFileOutputStream(filename); fos.write(data); fos.close(); }catch(IOExceptione){ e.printStackTrace(); } try{ System.out.println("Startingchild"); child=Runtime.getRuntime().exec(exec); newStreamWriter(child.getInputStream(),System.out); newStreamWriter(child.getErrorStream(),System.err); }catch(IOExceptione){ e.printStackTrace(); } } } publicvoidclosing(intrc){ synchronized(this){ notifyAll(); } }
DataMonitor类
DataMonitor类是本程序Zookeeper逻辑的核心,它差不多是异步的,并由事件驱动的。DataMonitor构造函数如下:
publicDataMonitor(ZooKeeperzk,Stringznode,WatcherchainedWatcher, DataMonitorListenerlistener){ this.zk=zk; this.znode=znode; this.chainedWatcher=chainedWatcher; this.listener=listener; //Getthingsstartedbycheckingifthenodeexists.Wearegoing //tobecompletelyeventdriven zk.exists(znode,true,this,null); }
调用ZooKeeper.exists()检查指定的Znode是否存在,并设置监视,传递自身引用作为回调对象,在某种意义上,在watch触发时就会引起真实的处理流程。
当ZooKeeper.exists()操作在服务器端完成时,ZooKeeperAPI会在客户端调用completioncallback:
publicvoidprocessResult(intrc,Stringpath,Objectctx,Statstat){ booleanexists; switch(rc){ caseCode.Ok: exists=true; break; caseCode.NoNode: exists=false; break; caseCode.SessionExpired: caseCode.NoAuth: dead=true; listener.closing(rc); return; default: //Retryerrors zk.exists(znode,true,this,null); return; } byteb[]=null; if(exists){ try{ b=zk.getData(znode,false,null); }catch(KeeperExceptione){ //Wedon'tneedtoworryaboutrecoveringnow.Thewatch //callbackswillkickoffanyexceptionhandling e.printStackTrace(); }catch(InterruptedExceptione){ return; } } if((b==null&&b!=prevData) ||(b!=null&&!Arrays.equals(prevData,b))){ listener.exists(b); prevData=b; } }
上述代码首先检查Znode是否存在,以及其他重大的不可恢复的错误。如果文件(或者Znode)存在,它将从Znode获取数据,如果状态发生变化再调用Executor的exists()回调函数。注意,getData函数本省必须要做任何的异常处理,因为本身就有监视可以处理任何错误:如果节点在调用ZooKeeper.getData()之前被删除,ZooKeeper.exists()就会触发回调函数,如果存在通信错误,在连接上的监视会在该连接重建之前触发相应的事件,同时引发相应的处理。
最后,DataMonitor处理监视事件的代码如下:
publicvoidprocess(WatchedEventevent){ Stringpath=event.getPath(); if(event.getType()==Event.EventType.None){ //Wearearebeingtoldthatthestateofthe //connectionhaschanged switch(event.getState()){ caseSyncConnected: //Inthisparticularexamplewedon'tneedtodoanything //here-watchesareautomaticallyre-registeredwith //serverandanywatchestriggeredwhiletheclientwas //disconnectedwillbedelivered(inorderofcourse) break; caseExpired: //It'sallover dead=true; listener.closing(KeeperException.Code.SessionExpired); break; } }else{ if(path!=null&&path.equals(znode)){ //Somethinghaschangedonthenode,let'sfindout zk.exists(znode,true,this,null); } } if(chainedWatcher!=null){ chainedWatcher.process(event); } }
如果客户端Zookeeper程序在会话失效时(Expiredevent)重新建立了通信信道(SyncConnectedevent),所有的会话监视会自动和服务器进行重连,(Zookeeper3.0.0以上版本会重置之前设置的监视).更多编程指南请参见ZooKeeperWatches。当DataMonitor获得了指定Znode的事件后,它将调用ZooKeeper.exists()来决定究竟发生了什么。
完整的程序:
Executor.java:
/** *AsimpleexampleprogramtouseDataMonitortostartand *stopexecutablesbasedonaznode.Theprogramwatchesthe *specifiedznodeandsavesthedatathatcorrespondstothe *znodeinthefilesystem.Italsostartsthespecifiedprogram *withthespecifiedargumentswhentheznodeexistsandkills *theprogramiftheznodegoesaway. */ importjava.io.FileOutputStream; importjava.io.IOException; importjava.io.InputStream; importjava.io.OutputStream; importorg.apache.zookeeper.KeeperException; importorg.apache.zookeeper.WatchedEvent; importorg.apache.zookeeper.Watcher; importorg.apache.zookeeper.ZooKeeper; publicclassExecutor implementsWatcher,Runnable,DataMonitor.DataMonitorListener { Stringznode; DataMonitordm; ZooKeeperzk; Stringfilename; Stringexec[]; Processchild; publicExecutor(StringhostPort,Stringznode,Stringfilename, Stringexec[])throwsKeeperException,IOException{ this.filename=filename; this.exec=exec; zk=newZooKeeper(hostPort,3000,this); dm=newDataMonitor(zk,znode,null,this); } /** *@paramargs */ publicstaticvoidmain(String[]args){ if(args.length<4){ System.err .println("USAGE:ExecutorhostPortznodefilenameprogram[args...]"); System.exit(2); } StringhostPort=args[0]; Stringznode=args[1]; Stringfilename=args[2]; Stringexec[]=newString[args.length-3]; System.arraycopy(args,3,exec,0,exec.length); try{ newExecutor(hostPort,znode,filename,exec).run(); }catch(Exceptione){ e.printStackTrace(); } } /*************************************************************************** *Wedoprocessanyeventsourselves,wejustneedtoforwardthemon. * *@seeorg.apache.zookeeper.Watcher#process(org.apache.zookeeper.proto.WatcherEvent) */ publicvoidprocess(WatchedEventevent){ dm.process(event); } publicvoidrun(){ try{ synchronized(this){ while(!dm.dead){ wait(); } } }catch(InterruptedExceptione){ } } publicvoidclosing(intrc){ synchronized(this){ notifyAll(); } } staticclassStreamWriterextendsThread{ OutputStreamos; InputStreamis; StreamWriter(InputStreamis,OutputStreamos){ this.is=is; this.os=os; start(); } publicvoidrun(){ byteb[]=newbyte[80]; intrc; try{ while((rc=is.read(b))>0){ os.write(b,0,rc); } }catch(IOExceptione){ } } } publicvoidexists(byte[]data){ if(data==null){ if(child!=null){ System.out.println("Killingprocess"); child.destroy(); try{ child.waitFor(); }catch(InterruptedExceptione){ } } child=null; }else{ if(child!=null){ System.out.println("Stoppingchild"); child.destroy(); try{ child.waitFor(); }catch(InterruptedExceptione){ e.printStackTrace(); } } try{ FileOutputStreamfos=newFileOutputStream(filename); fos.write(data); fos.close(); }catch(IOExceptione){ e.printStackTrace(); } try{ System.out.println("Startingchild"); child=Runtime.getRuntime().exec(exec); newStreamWriter(child.getInputStream(),System.out); newStreamWriter(child.getErrorStream(),System.err); }catch(IOExceptione){ e.printStackTrace(); } } } }
DataMonitor.java:
/** *AsimpleclassthatmonitorsthedataandexistenceofaZooKeeper *node.ItusesasynchronousZooKeeperAPIs. */ importjava.util.Arrays; importorg.apache.zookeeper.KeeperException; importorg.apache.zookeeper.WatchedEvent; importorg.apache.zookeeper.Watcher; importorg.apache.zookeeper.ZooKeeper; importorg.apache.zookeeper.AsyncCallback.StatCallback; importorg.apache.zookeeper.KeeperException.Code; importorg.apache.zookeeper.data.Stat; publicclassDataMonitorimplementsWatcher,StatCallback{ ZooKeeperzk; Stringznode; WatcherchainedWatcher; booleandead; DataMonitorListenerlistener; byteprevData[]; publicDataMonitor(ZooKeeperzk,Stringznode,WatcherchainedWatcher, DataMonitorListenerlistener){ this.zk=zk; this.znode=znode; this.chainedWatcher=chainedWatcher; this.listener=listener; //Getthingsstartedbycheckingifthenodeexists.Wearegoing //tobecompletelyeventdriven zk.exists(znode,true,this,null); } /** *OtherclassesusetheDataMonitorbyimplementingthismethod */ publicinterfaceDataMonitorListener{ /** *Theexistencestatusofthenodehaschanged. */ voidexists(bytedata[]); /** *TheZooKeepersessionisnolongervalid. * *@paramrc *theZooKeeperreasoncode */ voidclosing(intrc); } publicvoidprocess(WatchedEventevent){ Stringpath=event.getPath(); if(event.getType()==Event.EventType.None){ //Wearearebeingtoldthatthestateofthe //connectionhaschanged switch(event.getState()){ caseSyncConnected: //Inthisparticularexamplewedon'tneedtodoanything //here-watchesareautomaticallyre-registeredwith //serverandanywatchestriggeredwhiletheclientwas //disconnectedwillbedelivered(inorderofcourse) break; caseExpired: //It'sallover dead=true; listener.closing(KeeperException.Code.SessionExpired); break; } }else{ if(path!=null&&path.equals(znode)){ //Somethinghaschangedonthenode,let'sfindout zk.exists(znode,true,this,null); } } if(chainedWatcher!=null){ chainedWatcher.process(event); } } publicvoidprocessResult(intrc,Stringpath,Objectctx,Statstat){ booleanexists; switch(rc){ caseCode.Ok: exists=true; break; caseCode.NoNode: exists=false; break; caseCode.SessionExpired: caseCode.NoAuth: dead=true; listener.closing(rc); return; default: //Retryerrors zk.exists(znode,true,this,null); return; } byteb[]=null; if(exists){ try{ b=zk.getData(znode,false,null); }catch(KeeperExceptione){ //Wedon'tneedtoworryaboutrecoveringnow.Thewatch //callbackswillkickoffanyexceptionhandling e.printStackTrace(); }catch(InterruptedExceptione){ return; } } if((b==null&&b!=prevData) ||(b!=null&&!Arrays.equals(prevData,b))){ listener.exists(b); prevData=b; } } }
总结
本文关于ApacheZookeeper使用方法实例详解的介绍就到这里,希望对大家有所帮助。如果有什么问题可以留言,小编会及时回复大家的,感谢大家对毛票票网站的支持!