[RabbitMQ]13_RabbitMQ学习之spring-amqp的重要类的认识

   

对于大多数应用来说都做了与spring整合,对于rabbitmq来说。也有与spring的整合。可能通过spring的官网找到spring-amqp项目下载。spring-amqp项目包括三个子项目:spring-amqp、spring-erlang、spring-rabbit.

了 (76).jpg

下面来认识一下spring-amqp中的几个重要类;以spring-amqp-1.0.0.M3版本为例

1、Message: Spring AMQP定义的Message类是AMQP域模型中代表之一。Message类封装了body(消息BODY)和properties(消息属性) 。使得这个API看起来很简单。Message类定义如下:

 

[java]view plaincopy

print?

publicclassMessage{ privatefinalMessagePropertiesmessageProperties; privatefinalbyte[]body; publicMessage(byte[]body,MessagePropertiesmessageProperties){ this.body=body; this.messageProperties=messageProperties; } publicbyte[]getBody(){ returnthis.body; } publicMessagePropertiesgetMessageProperties(){ returnthis.messageProperties; } }

 

其中MessageProperties类中定义了例如messageId、timestamp、contentType等属性。这此属性可以扩展到用户通过setHeader(String key, Object value)方法来自定义“headers”。

2、Exchange
Exchange接口代表一个AMQP的Exchange,决定消息生产者发送消息。每个Exchange都包括一个特定的唯一名字的虚拟主机的代理和一些其他属性。

 

[java]view plaincopy

print?

publicinterfaceExchange{ StringgetName(); StringgetType(); booleanisDurable(); booleanisAutoDelete(); Map<String,Object>getArguments(); }

其中 AbstractExchange类实现了Exchange类。而DirectExchange、TopicExchange、FanoutExchang、HeadersExchange四个类继承AbstractExchange。并重写了getType()类。根据各自相对应的Exchange类型。DirectExchange、TopicExchange、FanoutExchang、HeadersExchange分别对应的类型为direct,topic,fanout,headers.

 

3、Queue
Queue类是消息消费者接收消息中重要的一个组成部分。通过与Exchange判定来肯定消费者所接收的消息。伪代码如下:

 

[java]view plaincopy

print?

publicclassQueue{ privatefinalStringname; privatevolatilebooleandurable; privatevolatilebooleanexclusive; privatevolatilebooleanautoDelete; privatevolatileMap<String,Object>arguments; publicQueue(Stringname){ this.name=name; }

其中name表示队列的名称、durable表示持久性。true表示是。exclusive表示独占性。由于在AmqpTemplate中提供一个方法来得到唯一的队列。这个队列可能是一个”reply-to“地址或者其他信息,因此一般exclusive和autoDelete一般设定为true.

 

4、Binding
Bingding类通过多种构造参数来判定Exchange,Queue,routingkey;例如

 

[java]view plaincopy

print?

Binding(Queuequeue,FanoutExchangeexchange) Binding(Queuequeue,HeadersExchangeexchange,Map<String,Object>arguments) Binding(Queuequeue,DirectExchangeexchange) Binding(Queuequeue,DirectExchangeexchange,StringroutingKey) Binding(Queuequeue,TopicExchangeexchange,StringroutingKey)

5、AmqpTemplate
AmqpTemplate是用来发送消息的模板类

 

 

[java]view plaincopy

print?

/** *SpecifiesabasicsetofAMQPoperations. * *Providessynchronoussendandreceivemethods.The{@link#convertAndSend(Object)}and{@link#receiveAndConvert()} *methodsallowletyousendandreceivePOJOobjects.Implementationsareexpectedtodelegatetoaninstanceof *{@linkMessageConverter}toperformconversiontoandfromAMQPbyte[]payloadtype. * *@authorMarkPollack *@authorMarkFisher */ publicinterfaceAmqpTemplate{ //sendmethodsformessages /** *Sendamessagetoadefaultexchangewithadefaultroutingkey. * *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidsend(Messagemessage)throwsAmqpException; /** *Sendamessagetoadefaultexchangewithaspecificroutingkey. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidsend(StringroutingKey,Messagemessage)throwsAmqpException; /** *Sendamessagetoaspecificexchangewithaspecificroutingkey. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidsend(Stringexchange,StringroutingKey,Messagemessage)throwsAmqpException; //sendmethodswithconversion /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithadefaultroutingkey. * *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Objectmessage)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithaspecificroutingkey. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(StringroutingKey,Objectmessage)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoaspecificexchangewithaspecificroutingkey. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Stringexchange,StringroutingKey,Objectmessage)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithadefaultroutingkey. * *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoadefaultexchangewithaspecificroutingkey. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor) throwsAmqpException; /** *ConvertaJavaobjecttoanAmqp{@linkMessage}andsendittoaspecificexchangewithaspecificroutingkey. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@throwsAmqpExceptionifthereisaproblem */ voidconvertAndSend(Stringexchange,StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor) throwsAmqpException; //receivemethodsformessages /** *Receiveamessageifthereisonefromadefaultqueue.Returnsimmediately,possiblywithanullvalue. * *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ Messagereceive()throwsAmqpException; /** *Receiveamessageifthereisonefromaspecificqueue.Returnsimmediately,possiblywithanullvalue. * *@paramqueueNamethenameofthequeuetopoll *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ Messagereceive(StringqueueName)throwsAmqpException; //receivemethodswithconversion /** *ReceiveamessageifthereisonefromadefaultqueueandconvertittoaJavaobject.Returnsimmediately, *possiblywithanullvalue. * *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ ObjectreceiveAndConvert()throwsAmqpException; /** *ReceiveamessageifthereisonefromaspecificqueueandconvertittoaJavaobject.Returnsimmediately, *possiblywithanullvalue. * *@paramqueueNamethenameofthequeuetopoll *@returnamessageornullifthereisnonewaiting *@throwsAmqpExceptionifthereisaproblem */ ObjectreceiveAndConvert(StringqueueName)throwsAmqpException; //sendandreceivemethodsformessages /** *BasicRPCpattern.Sendamessagetoadefaultexchangewithadefaultroutingkeyandattempttoreceivea *response.Implementationswillnormallysetthereply-toheadertoanexclusivequeueandwaitupforsometime *limitedbyatimeout. * *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ MessagesendAndReceive(Messagemessage)throwsAmqpException; /** *BasicRPCpattern.Sendamessagetoadefaultexchangewithaspecificroutingkeyandattempttoreceivea *response.Implementationswillnormallysetthereply-toheadertoanexclusivequeueandwaitupforsometime *limitedbyatimeout. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ MessagesendAndReceive(StringroutingKey,Messagemessage)throwsAmqpException; /** *BasicRPCpattern.Sendamessagetoaspecificexchangewithaspecificroutingkeyandattempttoreceivea *response.Implementationswillnormallysetthereply-toheadertoanexclusivequeueandwaitupforsometime *limitedbyatimeout. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ MessagesendAndReceive(Stringexchange,StringroutingKey,Messagemessage)throwsAmqpException; //sendandreceivemethodswithconversion /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewithadefault *routingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswillnormally *setthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Objectmessage)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(StringroutingKey,Objectmessage)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoaspecificexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Stringexchange,StringroutingKey,Objectmessage)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewithadefault *routingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswillnormally *setthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoadefaultexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; /** *BasicRPCpatternwithconversion.SendaJavaobjectconvertedtoamessagetoaspecificexchangewitha *specificroutingkeyandattempttoreceivearesponse,convertingthattoaJavaobject.Implementationswill *normallysetthereply-toheadertoanexclusivequeueandwaitupforsometimelimitedbyatimeout. * *@paramexchangethenameoftheexchange *@paramroutingKeytheroutingkey *@parammessageamessagetosend *@parammessagePostProcessoraprocessortoapplytothemessagebeforeitissent *@returntheresponseifthereisone *@throwsAmqpExceptionifthereisaproblem */ ObjectconvertSendAndReceive(Stringexchange,StringroutingKey,Objectmessage,MessagePostProcessormessagePostProcessor)throwsAmqpException; }


6、AmqpAdmin和RabbitAdmin
用户配置Queue、Exchange、Binding的代理类。代理类会自动声明或创建这些配置信息。
下面这个类用于异步接收消息的处理类