Skip to content

Commit 78fbc2c

Browse files
committed
Add DisrutporSupport,use new thread to process event
1 parent 45d1264 commit 78fbc2c

File tree

6 files changed

+84
-13
lines changed

6 files changed

+84
-13
lines changed

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,11 @@
131131
<artifactId>aws-java-sdk-s3</artifactId>
132132
<version>1.11.812</version>
133133
</dependency>
134+
<dependency>
135+
<groupId>com.lmax</groupId>
136+
<artifactId>disruptor</artifactId>
137+
<version>3.4.2</version>
138+
</dependency>
134139
<dependency>
135140
<groupId>org.grpcmock</groupId>
136141
<artifactId>grpcmock-junit5</artifactId>

wechaty-puppet-padplus/pom.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,10 @@
1212

1313

1414
<dependencies>
15+
<dependency>
16+
<groupId>com.lmax</groupId>
17+
<artifactId>disruptor</artifactId>
18+
</dependency>
1519
<dependency>
1620
<groupId>com.amazonaws</groupId>
1721
<artifactId>aws-java-sdk-s3</artifactId>

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/PuppetPadplus.scala

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,9 +24,11 @@ class PuppetPadplus(val option:PuppetOptions,val storePath:String="/tmp/padplus"
2424
with GrpcSupport
2525
with GrpcEventSupport
2626
with LocalStoreSupport
27+
with DisrutporSupport
2728
with LazyLogging {
2829
protected var uinOpt:Option[String]=None
2930
def start(): Unit ={
31+
startDisruptor()
3032
startGrpc(option.endPoint.get)
3133
//waiting stream start....
3234
logger.info("waiting stream start....")
@@ -42,12 +44,15 @@ class PuppetPadplus(val option:PuppetOptions,val storePath:String="/tmp/padplus"
4244
}
4345
}
4446
def stop(): Unit = {
47+
shutdownDisruptor()
4548
stopGrpc()
4649
stopLocalStore()
4750
}
4851

4952
override def selfIdOpt(): Option[String] = selfId
5053

54+
55+
5156
/**
5257
*
5358
* Friendship
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package wechaty.padplus.support
2+
3+
import java.util.concurrent.{Executors, TimeUnit}
4+
5+
import com.lmax.disruptor.dsl.{Disruptor, ProducerType}
6+
import com.lmax.disruptor.{BusySpinWaitStrategy, EventFactory, EventHandler, ExceptionHandler}
7+
import com.typesafe.scalalogging.LazyLogging
8+
import wechaty.puppet.events.EventEmitter
9+
import wechaty.puppet.schemas.Puppet.PuppetEventName
10+
import wechaty.puppet.schemas.Puppet.PuppetEventName.Type
11+
12+
/**
13+
*
14+
* @author <a href="mailto:[email protected]">Jun Tsai</a>
15+
* @since 2020-06-30
16+
*/
17+
trait DisrutporSupport extends EventEmitter {
18+
self :LazyLogging =>
19+
private class EventDef {
20+
var eventName: PuppetEventName.Type = _
21+
var value : Any = _
22+
}
23+
private var disruptor:Disruptor[EventDef] = _
24+
def startDisruptor(): Unit ={
25+
disruptor = new Disruptor[EventDef](
26+
new EventFactory[EventDef]() {
27+
@Override
28+
def newInstance():EventDef= {
29+
new EventDef()
30+
}
31+
},
32+
1 << 32,
33+
Executors.defaultThreadFactory(),
34+
ProducerType.SINGLE,
35+
new BusySpinWaitStrategy()
36+
)
37+
disruptor.handleEventsWith(new EventHandler[EventDef](){
38+
override def onEvent(event: EventDef, sequence: Long, endOfBatch: Boolean): Unit = {
39+
logger.debug("emit event using super {}",event.eventName)
40+
DisrutporSupport.super.emit(event.eventName,event.value)
41+
}
42+
})
43+
disruptor.setDefaultExceptionHandler(new ExceptionHandler[EventDef] {
44+
override def handleEventException(ex: Throwable, sequence: Long, event: EventDef): Unit = {
45+
logger.error(ex.getMessage,ex)
46+
}
47+
48+
override def handleOnStartException(ex: Throwable): Unit = {
49+
throw ex
50+
}
51+
52+
override def handleOnShutdownException(ex: Throwable): Unit = {
53+
logger.error(ex.getMessage,ex)
54+
}
55+
})
56+
disruptor.start()
57+
}
58+
override def emit[T](eventName: Type, value: T): Unit = {
59+
logger.debug("publish event {}",eventName)
60+
disruptor.publishEvent((event: EventDef, sequence: Long) => {
61+
event.eventName = eventName
62+
event.value = value
63+
})
64+
}
65+
protected def shutdownDisruptor(): Unit ={
66+
disruptor.shutdown(5,TimeUnit.SECONDS);
67+
}
68+
}

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/GrpcSupport.scala

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -98,16 +98,6 @@ trait GrpcSupport {
9898
logger.info("start grpc client ....")
9999
this.grpcClient = PadPlusServerGrpc.newBlockingStub(channel)
100100
startStream()
101-
102-
// this.grpcClient.start(Base.StartRequest.newBuilder().build())
103-
//
104-
// try{
105-
// //sometime the grpc can't work well,so logout before start bot
106-
// // this.grpcClient.logout(Base.LogoutRequest.newBuilder().build())
107-
// }catch{
108-
// case e:Throwable=>
109-
// logger.warn(e.getMessage)
110-
// }
111101
logger.info("start grpc client done")
112102
}
113103

wechaty-puppet-padplus/src/main/scala/wechaty/padplus/support/MessageRawSupport.scala

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,9 +68,8 @@ trait MessageRawSupport {
6868
json.put("messageType",PadplusMessageType.Text.id)
6969
json.put("fromUserName",selfId.get)
7070
json.put("toUserName",conversationId)
71-
val payload = asyncRequest[GrpcMessagePayload](ApiType.SEND_MESSAGE,Some(json.toString))
72-
// payload.msgId
73-
null
71+
val payload = syncRequest[GrpcMessagePayload](ApiType.SEND_MESSAGE,Some(json.toString))
72+
payload.MsgId
7473
}
7574

7675
override def messageSendUrl(conversationId: String, urlLinkPayload: UrlLink.UrlLinkPayload): String = ???

0 commit comments

Comments
 (0)