文章目录
- 一、快速开始
- 1.1 SOFARPC
- 1.2 基于SOFABoot
- 二、注册中心
- 三、通讯协议
- 2.1 Bolt
- 基本发布
- 调用方式
- 超时控制
- 协议泛化调用
- 序列化协议
- 自定义线程池
- 2.2 RESTful
- 基本使用
- 2.3 其他协议
- 四、架构
- 附录
官方样例下载地址-sofa-boot-guides
可查看 SOFARPC 方式快速入门
一、快速开始
1.1 SOFARPC
导入如下依赖
<dependency><groupId>com.alipay.sofa</groupId><artifactId>sofa-rpc-all</artifactId><version>最新版本</version>
</dependency>
版本查看
创建一个接口
public interface HelloService {String sayHello(String string);
}
实现它
public class HelloServiceImpl implements HelloService {@Overridepublic String sayHello(String string) {System.out.println("Server receive: " + string);return "hello " + string + " !";}
}
服务器代码
import com.alipay.sofa.rpc.config.ProviderConfig;
import com.alipay.sofa.rpc.config.ServerConfig;public class QuickStartServer {public static void main(String[] args) {ServerConfig serverConfig = new ServerConfig().setProtocol("bolt") // 设置一个协议,默认bolt.setPort(12200) // 设置一个端口,默认12200.setDaemon(false); // 非守护线程ProviderConfig<HelloService> providerConfig = new ProviderConfig<HelloService>().setInterfaceId(HelloService.class.getName()) // 指定接口.setRef(new HelloServiceImpl()) // 指定实现.setServer(serverConfig); // 指定服务端providerConfig.export(); // 发布服务}
}
客户端代码
import com.alipay.sofa.rpc.config.ConsumerConfig;public class QuickStartClient {public static void main(String[] args) {ConsumerConfig<HelloService> consumerConfig = new ConsumerConfig<HelloService>().setInterfaceId(HelloService.class.getName()) // 指定接口.setProtocol("bolt") // 指定协议.setDirectUrl("bolt://127.0.0.1:12200"); // 指定直连地址// 生成代理类HelloService helloService = consumerConfig.refer();while (true) {System.out.println(helloService.sayHello("world"));try {Thread.sleep(2000);} catch (Exception e) {}}}
}
1.2 基于SOFABoot
引入如下依赖
<parent><groupId>com.alipay.sofa</groupId><artifactId>sofaboot-dependencies</artifactId><version>3.2.0</version></parent><dependencies><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-web</artifactId></dependency><dependency><groupId>com.alipay.sofa</groupId><artifactId>rpc-sofa-boot-starter</artifactId></dependency></dependencies>
以及基本的SpringBoot配置
spring.application.name=rpcApp
logging.path=./logs
server.port=8080
服务接口
package com.SOFABootRPCTest.service;public interface AnnotationService {String sayAnnotation(String string);
}
服务实现接口
在前文中,我们用@Configuration、@Bean+@SofaService来完成的发布与引用,我们这里就直接用@Service+@SofaService的方式完成
package com.SOFABootRPCTest.serviceImpl;import com.SOFABootRPCTest.service.AnnotationService;
import com.alipay.sofa.runtime.api.annotation.SofaService;
import com.alipay.sofa.runtime.api.annotation.SofaServiceBinding;
import org.springframework.stereotype.Service;@SofaService(interfaceType = AnnotationService.class, uniqueId = "annotationServiceImpl", bindings = { @SofaServiceBinding(bindingType = "bolt")})
@Service
public class AnnotationServiceImpl implements AnnotationService {@Overridepublic String sayAnnotation(String string) {return string;}
}
bolt这里是协议,我们在前文中这个地方是jvm。不同协议用法不同。可以看到,默认是jvm协议。
上面代表我们的接口就正式发布了。可以通过互联网进行访问。
我们现在要调用我们发布的接口,基本和jvm一样。jvmFirst默认为True,也就是优先使用jvm协议。
package com.client;import com.alipay.sofa.runtime.api.annotation.SofaReference;
import com.alipay.sofa.runtime.api.annotation.SofaReferenceBinding;
import com.service.AnnotationService;
import org.springframework.stereotype.Component;@Component
public class AnnotationClientImpl {@SofaReference(interfaceType = AnnotationService.class, jvmFirst = false,uniqueId = "annotationServiceImpl",binding = @SofaReferenceBinding(bindingType = "bolt"))private AnnotationService annotationService;public String sayClientAnnotation(String str) {return annotationService.sayAnnotation(str);}
}
使用起来很像Fegin直接调用方法即可。
文档给的代码如下。也可以用 阿里官方给的 SOFARPC快速入门里面的代码。
将下列main中代码放入当前MainApplication中,即可。
package com.SOFABootRPCTest.client;import com.SOFABootRPCTest.Contoller.AnnotationClientImpl;
import org.springframework.boot.SpringApplication;
import org.springframework.context.ApplicationContext;public class AnotationClientApplication {public static void main(String[] args) {// 临时切换地址,避免端口重复System.setProperty("server.port", "8081");SpringApplication springApplication = new SpringApplication(AnotationClientApplication.class);ApplicationContext applicationContext = springApplication.run(args);AnnotationClientImpl annotationService = applicationContext.getBean(AnnotationClientImpl.class);String result = annotationService.sayClientAnnotation("annotation");System.out.println("invoke result:" + result);}
}
二、注册中心
笔者使用Nacos作为注册中心,其他注册中心可参考注册中心
使用时注意版本要求
图方便可以用本地文件作为注册中心
三、通讯协议
我们此处仅尝试注解方式的发布引用。
可以自行参考协议基本使用
可以创建如下结构项目
2.1 Bolt
基本发布
api中创建
package com.serviceApi;public interface SampleService {String HelloWorld();
}
producer中创建
package com.producer.serviceImpl;import com.alipay.sofa.runtime.api.annotation.SofaService;
import com.alipay.sofa.runtime.api.annotation.SofaServiceBinding;
import com.serviceApi.SampleService;
import org.springframework.stereotype.Service;@Service
@SofaService(uniqueId = "sampleService",bindings = {@SofaServiceBinding(bindingType = "bolt")})
public class SampleServiceImpl implements SampleService {@Overridepublic String HelloWorld() {System.out.println("==========================\nhello world!\n==========================\n");return "hello world";}
}
consumer中使用
package com.consumer;import com.alipay.sofa.runtime.api.annotation.SofaReference;
import com.alipay.sofa.runtime.api.annotation.SofaReferenceBinding;
import com.serviceApi.SampleService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest(classes = ConsumerApplication.class, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class test {@SofaReference(uniqueId = "sampleService",binding = @SofaReferenceBinding(bindingType = "bolt"))private SampleService sampleService;@Testpublic void testRPC(){sampleService.HelloWorld();}
}
我们启动服务producer服务,然后consumer使用Test(上述代码)
可以看到成功调用
使用端口可通过如下配置进行修改
com.alipay.sofa.rpc.bolt.port=端口号
调用方式
- 同步
见前文
- 异步
异步调用的方式下,客户端发起调用后不会等到服务端的结果,继续执行后面的业务逻辑。服务端返回的结果会被 SOFARPC 缓存,当客户端需要结果的时候,再主动调用 API 获取。
异步方式如下:
package com.consumer;import com.alipay.sofa.rpc.api.future.SofaResponseFuture;
import com.alipay.sofa.runtime.api.annotation.SofaReference;
import com.alipay.sofa.runtime.api.annotation.SofaReferenceBinding;
import com.serviceApi.SampleService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest(classes = ConsumerApplication.class, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class test {@SofaReference(uniqueId = "sampleService",binding = @SofaReferenceBinding(bindingType = "bolt",invokeType = "future"))private SampleService sampleService;@Testpublic void testRPC(){try {sampleService.HelloWorld();// true表示清除String result = (String) SofaResponseFuture.getResponse(10000, true);System.out.println(result);}catch (InterruptedException e) {throw new RuntimeException(e);}}
}
注意,观察源码会发现内部是用ThreadLocalMap,如果一个线程发布了两个异步请求,但其间没有获取结果,其结果后到的一项会覆盖前一项。
我们进行如下测试。
为SampleService再添一个实例
package com.producer.serviceImpl;import com.alipay.sofa.runtime.api.annotation.SofaService;
import com.alipay.sofa.runtime.api.annotation.SofaServiceBinding;
import com.serviceApi.SampleService;
import org.springframework.stereotype.Service;@Service
@SofaService(uniqueId = "sampleService2",bindings = @SofaServiceBinding(bindingType = "bolt"))
public class SampleServiceImpl2 implements SampleService {@Overridepublic String HelloWorld() {System.out.println("==========================\nhello Java!\n==========================\n");return "hello Java!";}
}
我们将测试部分改成这样的代码(睡眠了一秒为了保证前一项执行完毕)
package com.consumer;import com.alipay.sofa.rpc.api.future.SofaResponseFuture;
import com.alipay.sofa.runtime.api.annotation.SofaReference;
import com.alipay.sofa.runtime.api.annotation.SofaReferenceBinding;
import com.serviceApi.SampleService;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;@RunWith(SpringRunner.class)
@SpringBootTest(classes = ConsumerApplication.class, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class test {@SofaReference(uniqueId = "sampleService",binding = @SofaReferenceBinding(bindingType = "bolt",invokeType = "future"))private SampleService sampleService;@SofaReference(uniqueId = "sampleService2",binding = @SofaReferenceBinding(bindingType = "bolt",invokeType = "future"))private SampleService sampleService2;@Testpublic void testRPC(){try {sampleService.HelloWorld();Thread.sleep(1000);sampleService2.HelloWorld();String result = (String) SofaResponseFuture.getResponse(10000, true);System.out.println(result);}catch (InterruptedException e) {throw new RuntimeException(e);}}
}
服务器端没有问题
结果却输出了hello Java
如果我们去两次会报错,说明确实覆盖了。(当然如果是false就不会被清除)
public void testRPC(){try {sampleService.HelloWorld();Thread.sleep(1000);sampleService2.HelloWorld();String result = (String) SofaResponseFuture.getResponse(10000, true);String result2 = (String) SofaResponseFuture.getResponse(10000, true);System.out.println(result);System.out.println(result2);}catch (InterruptedException e) {throw new RuntimeException(e);}}
我们可以发布后交给Java原来的异步调用来管理。
Future future = SofaResponseFuture.getFuture(true);future.get(10000, TimeUnit.MILLISECONDS);
- 回调
需要实现SofaResponseCallback类
package com.consumer.callback;import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.RequestBase;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;;@Configuration
public class CallbackConfiguration {Logger log = LoggerFactory.getLogger(CallbackConfiguration.class);@Bean("sampleCallback")public SofaResponseCallback getSampleCallback(){return new SofaResponseCallback() {@Overridepublic void onAppResponse(Object o, String s, RequestBase requestBase) {log.info("调用回调");log.info("{}",o.toString());}@Overridepublic void onAppException(Throwable throwable, String s, RequestBase requestBase) {System.out.println("APP错误");}@Overridepublic void onSofaException(SofaRpcException e, String s, RequestBase requestBase) {System.out.println("SOFA错误");}};}
}
SOFARPC 为设置回调接口提供了两种方式,
- Callback Class:Callback Class 的方式直接设置回调的类名,SOFARPC 会通过调用回调类的默认构造函数的方式生成回调类的实例。
- Callback Ref:Callback Ref 的方式则为用户直接提供回调类的实例。callbackRef 属性的值需要是回调类的 Bean 名称。
- 单向
当客户端发送请求后不关心服务端返回的结果时,可以使用单向的调用方式,这种方式会在发起调用后立即返回 null,并且忽略服务端的返回结果。
使用单向的方式只需要将调用方式设置为 oneway 即可,设置方式和将调用方式设置为 future 或者 callback 一样,这里不再重复讲述,可以参考上面的文档中提供的设置方式。
需要特别注意的是,由于单向的调用方式会立即返回,所以所有的超时设置在单向的情况下都是无效的。
超时控制
使用 Bolt 协议进行通信的时候,SOFARPC 的超时时间默认为 3 秒,用户可以在引用服务的时候去设置超时时间,又分别可以在服务以及方法的维度设置超时时间,SOFARPC 的超时时间的设置的单位都为毫秒。
可自行查看-Bolt 协议超时控制
- 服务维度
如果需要在发布服务的时候在服务维度设置超时时间,设置对应的 timeout 参数到对应的值即可。
@SofaReference(binding = @SofaReferenceBinding(bindingType = "bolt", timeout = 2000))
private SampleService sampleService;
- 方法维度
注解模式暂无
协议泛化调用
发布可以不变
xml无法读入时,可以尝试如下方式(*代表查找所有,无*代表使用找到的第一个)
@ImportResource({ "classpath*:rpc-starter-example.xml" })
在引用时需要配置XML
<beans xmlns="http://www.springframework.org/schema/beans"xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"xmlns:sofa="http://sofastack.io/schema/sofaboot"xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://sofastack.io/schema/sofaboot http://sofastack.io/schema/sofaboot.xsd"default-autowire="byName"><sofa:reference jvm-first="false" id="sampleGenericServiceReference" interface="com.serviceApi.SampleService"><sofa:binding.bolt><sofa:global-attrs generic-interface="com.serviceApi.SampleService"/></sofa:binding.bolt>
</sofa:reference></beans>
@Testpublic void testRPC() {GenericService sampleGenericServiceReference = (GenericService) SpringUtil.getBean("sampleGenericServiceReference");// 需要传入方法名,方法类型,方法参数String result = sampleGenericServiceReference.$genericInvoke("HelloWorld",new String[]{},new Object[]{},String.class);System.out.println(result);}
其他规则可参考官方给出的例子
/**
* Java Bean
*/
public class People {private String name;private int age;// getters and setters
}/*** 服务方提供的接口*/
interface SampleService {String hello(String arg);People hello(People people);String[] hello(String[] args);
}/*** 客户端*/
public class ConsumerClass {GenericService genericService;public void do() {// 1. $invoke仅支持方法参数类型在当前应用的 ClassLoader 中存在的情况genericService.$invoke("hello", new String[]{ String.class.getName() }, new Object[]{"I'm an arg"});// 2. $genericInvoke支持方法参数类型在当前应用的 ClassLoader 中不存在的情况。// 2.1 构造参数GenericObject genericObject = new GenericObject("com.alipay.sofa.rpc.test.generic.bean.People"); // 构造函数中指定全路径类名genericObject.putField("name", "Lilei"); // 调用putField,指定field值genericObject.putField("age", 15);// 2.2 进行调用,不指定返回类型,返回结果类型为GenericObjectObject obj = genericService.$genericInvoke("hello", new String[]{"com.alipay.sofa.rpc.test.generic.bean.People"}, new Object[] { genericObject });Assert.assertTrue(obj.getClass == GenericObject.class);// 如果返回的是泛化的自定义对象,可以如下方式取出。String name = result.getField("name");// 2.3 进行调用,指定返回类型People people = genericService.$genericInvoke("hello", new String[]{"com.alipay.sofa.rpc.test.generic.bean.People"}, new Object[] { genericObject }, People.class);// 2.4 进行调用,参数类型是数组类型String[] result = (String[]) proxy.$genericInvoke("hello", new String[]{new String[0].getClass().getName()}, new Object[]{ new String[]{"args"} });}
}
序列化协议
SOFARPC 可以在使用 Bolt 通信协议的情况下,可以选择不同的序列化协议,
- hessian2
- protobuf
默认的情况下,SOFARPC 使用 hessian2 作为序列化协议
配置
@SofaService(bindings = @SofaServiceBinding(bindingType = "bolt",serializeType = "protobuf"))
public class SampleServiceImpl implements SampleService {
}@SofaReference(binding = @SofaReferenceBinding(bindingType = "bolt", serializeType = "protobuf"))
private SampleService sampleService;
自定义线程池
SOFARPC 支持自定义业务线程池。可以为指定服务设置一个独立的业务线程池,和 SOFARPC 自身的业务线程池是隔离的。多个服务可以共用一个独立的线程池。
SOFARPC 要求自定义线程池的类型必须是 com.alipay.sofa.rpc.server.UserThreadPool。
配置
@SofaService(bindings = {@SofaServiceBinding(bindingType = "bolt", userThreadPool = "customThreadPool")})
public class SampleServiceImpl implements SampleService {
}
2.2 RESTful
基本使用
import javax.ws.rs.GET;
import javax.ws.rs.Path;@Path("/sample")
public interface SampleService {@GET@Path("/hello")String HelloWorld();
}
@Service
@SofaService(uniqueId = "sampleService",bindings = @SofaServiceBinding(bindingType = "rest"))
public class SampleServiceImpl implements SampleService {@Overridepublic String HelloWorld() {System.out.println("==========================\nhello world!\n==========================\n");return "hello world";}
}
上述代码需放在同一项目中。
可以使用浏览器访问
http://localhost:8341/sample/hello
SOFARPC 的 RESTful 服务的默认端口为 8341。
可配置进行修改
com.alipay.sofa.rpc.rest.port=端口号
也可以用如下方式调用
@RunWith(SpringRunner.class)
@SpringBootTest(classes = ConsumerApplication.class, webEnvironment = SpringBootTest.WebEnvironment.DEFINED_PORT)
public class test {@SofaReference(uniqueId = "sampleService",binding = @SofaReferenceBinding(bindingType = "rest"))private SampleService sampleService;@Testpublic void testRPC(){System.out.println(sampleService.HelloWorld());}}
2.3 其他协议
除Http外,用法与Bolt类似。可参看协议基本使用
四、架构
附录
SOFARPC 方式快速入门
阿里云官方文档