JAVA通过Gearman实现MySQL到Redis的数据同步(异步复制) (3)

  问题:config类为spring注入的配置文件类,在worker.addFunction中,如果通过config类的属性,并且属性是从配置文件来的就会有问题。不知道为啥,写死就是OK的。此类连接远程的gearman job server。

  

  jar包需要添加到本地jar仓库:

mvn install:install-file -Dfile=C:\software\java-gearman-service-0.6.6.jar -DgroupId=org.gearman.jgs -DartifactId=java-gearman-service -Dversion=0.6.6 -Dpackaging=jar

 

import java.util.concurrent.TimeUnit;

 

import org.gearman.Gearman;

import org.gearman.GearmanFunction;

import org.gearman.GearmanFunctionCallback;

import org.gearman.GearmanServer;

import org.gearman.GearmanWorker;

 

/**

 * *ECHO_HOST = "192.168.125.131"为安装了Gearman并开启geramand服务的主机地址

 *int ECHO_PORT = 4730默认端口为4730

 *

 * @author Administrator

 *

 */

public class EchoWorker implements GearmanFunction {

 

// function name

public static final String ECHO_FUNCTION_NAME = "syncToRedis";

 

// job server地址

public static final String ECHO_HOST = "192.168.1.245";

 

// job server监听的端口

public static final int ECHO_PORT = 4730;

 

public static void main(String[] args) {

// 创建一个Gearman实例

Gearman gearman = Gearman.createGearman();

/*

 * 创建一个jobserver

 * 

 * Parameter 1: job server的IP地址 Parameter 2: job server监听的端口

 * 

 * job server收到client的job,并将其分发给注册worker

 * 

 */

GearmanServer server = gearman.createGearmanServer(EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);

// 创建一个Gearman的worker

GearmanWorker worker = gearman.createGearmanWorker(); // 正题来了,创建work节点。

worker.setReconnectPeriod(2, TimeUnit.SECONDS); // 设置超时重连时间

worker.setMaximumConcurrency(5); // 最大并发数

// 告诉工人如何执行工作(主要实现了GearmanFunction接口)

worker.addFunction(EchoWorker.ECHO_FUNCTION_NAME, new EchoWorker());

// worker连接服务器

worker.addServer(server);

}

 

@Override

public byte[] work(String function, byte[] data, GearmanFunctionCallback callback) throws Exception {

// work方法实现了GearmanFunction接口中的work方法,本实例中进行了字符串的反写

if (data != null) {

String str = new String(data);

System.out.println(str);

StringBuffer sb = new StringBuffer(str);

return sb.reverse().toString().getBytes();

} else {

return "未接收到data".getBytes();

}

}

}

 

 

import org.gearman.Gearman;  

import org.gearman.GearmanClient;  

import org.gearman.GearmanJobEvent;  

import org.gearman.GearmanJobReturn;  

import org.gearman.GearmanServer;  

  

public class EchoClient {  

    public static void main(String... args) throws InterruptedException {  

            //创建一个Gearman实例  

            Gearman gearman = Gearman.createGearman();  

            //创建一个Gearman client               

            GearmanClient client = gearman.createGearmanClient();  

            /*  

             * 创建一个jobserver  

             *   

             * Parameter 1: job server的IP地址  

             * Parameter 2: job server监听的端口  

             *   

             *job server收到client的job,并将其分发给注册worker  

             *  

             */  

            GearmanServer server = gearman.createGearmanServer(  

                            EchoWorker.ECHO_HOST, EchoWorker.ECHO_PORT);  

             // 告诉客户端,提交工作时它可以连接到该服务器  

            client.addServer(server);  

            /*  

             * 向job server提交工作  

             *   

             * Parameter 1: gearman function名字  

             * Parameter 2: 传送给job server和worker的数据  

             *   

             * GearmanJobReturn返回job发热结果  

             */  

            GearmanJobReturn jobReturn = client.submitJob(  

                            EchoWorker.ECHO_FUNCTION_NAME, ("Hello World!").getBytes());  

            //遍历作业事件,直到我们打到最后文件               

            while (!jobReturn.isEOF()) {  

  

                    //下一个作业事件  

                    GearmanJobEvent event = jobReturn.poll();  

  

                    switch (event.getEventType()) {  

  

                    case GEARMAN_JOB_SUCCESS:     //job执行成功  

                            System.out.println(new String(event.getData()));  

                            break;  

                    case GEARMAN_SUBMIT_FAIL:     //job提交失败  

                          

                    case GEARMAN_JOB_FAIL:        //job执行失败  

                            System.err.println(event.getEventType() + ": "  

                                            + new String(event.getData()));  

                    default:  

                    }  

            }  

            //关闭  

            gearman.shutdown();  

    }  

}  

 

php方案:https://www.tuicool.com/articles/B7Jjaa

内容版权声明:除非注明,否则皆为本站原创文章。

转载注明出处:https://www.heiqu.com/zwzxfx.html