416 lines
14 KiB
Go
416 lines
14 KiB
Go
package baseserver
|
||
|
||
import (
|
||
"math"
|
||
"rocommon"
|
||
"rocommon/rpc"
|
||
"rocommon/service"
|
||
"rocommon/util"
|
||
"roserver/baseserver/model"
|
||
"roserver/baseserver/router"
|
||
"roserver/serverproto"
|
||
"strconv"
|
||
)
|
||
|
||
// /////////////////////////////////////////ServerTCPEventHook
|
||
// game.backend
|
||
// 服务器之间的消息处理派发
|
||
type ServerTCPEventHook struct {
|
||
recvPingNum int32
|
||
}
|
||
|
||
// def.go EventHook interface
|
||
func (this *ServerTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
|
||
switch msg := in.Msg().(type) {
|
||
case *serverproto.ServiceIdentifyACK: //来自其他服务器的连接确认信息
|
||
util.InfoF("[RecvServiceIdentifyACK]%v id=%v", msg.ServiceId, in.Session().ID())
|
||
// 重连时会有问题,重连上来时,但是上一个连接还未移除(正在移除中),导致重连失败(想连接的没连接上,该移除的正在移除)
|
||
// 通过PingReq超时断开连接,来触发断线重連
|
||
if serviceNode := model.GetServiceNode(msg.ServiceId); serviceNode == nil {
|
||
//添加连接上来的对端服务
|
||
model.AddServiceNode(in.Session(), msg.ServiceId, msg.ServiceName, "remote")
|
||
|
||
//服务器之间才启用heartbeat操作(只能反应ack端的send和发起连接端的recv是否正常) 5s
|
||
in.Session().HeartBeat(&serverproto.PingReq{NeedAck: true})
|
||
|
||
//如果是DB,则请求加载全局db数据
|
||
if msg.ServiceName == model.SERVICE_NODE_TYPE_DB_STR {
|
||
data, meta, err := rpc.EncodeMessage(&serverproto.SSGlobalLoadReq{})
|
||
if err == nil {
|
||
in.Session().Send(&serverproto.ServiceTransmitAck{
|
||
MsgId: uint32(meta.ID),
|
||
MsgData: data,
|
||
})
|
||
} else {
|
||
util.ErrorF("SSGlobalLoadReq err=%v", err)
|
||
}
|
||
}
|
||
}
|
||
case *serverproto.PingReq:
|
||
{
|
||
//来自ack服务器的ping消息
|
||
ctx := in.Session().(rocommon.ContextSet)
|
||
var sid *service.ETCDServiceDesc
|
||
in.Session().IncRecvPingNum(1)
|
||
if in.Session().RecvPingNum() >= 10 { //50s打印一次,收到10次打印一次
|
||
in.Session().IncRecvPingNum(-1)
|
||
if ctx.RawContextData("ctx", &sid) {
|
||
//util.InfoF("[RecvServicePing]Receive PingReq from session=%v node=%v", in.Session().ID(), sid.ID)
|
||
}
|
||
}
|
||
if msg.NeedAck {
|
||
in.Session().Send(&serverproto.PingReq{NeedAck: false})
|
||
}
|
||
}
|
||
case *rocommon.SessionConnected:
|
||
//连接上对应类型的服务器节点后,发送确认信息(ServiceIdentifyACK),告诉对端自己的服务器类型
|
||
ctx := in.Session().Node().(rocommon.ContextSet)
|
||
var sid *service.ETCDServiceDesc
|
||
//sid在CreateConnector中会指定
|
||
if ctx.RawContextData("sid", &sid) {
|
||
in.Session().Send(&serverproto.ServiceIdentifyACK{
|
||
//发送自身服务器节点的信息
|
||
ServiceName: service.GetServiceName(), //service/init.go
|
||
ServiceId: service.GetLocalServiceID(),
|
||
ServerStartTime: util.GetTimeMilliseconds(),
|
||
})
|
||
|
||
//添加远程的服务器节点到本地,sid服务器信息是从etcd中获取的
|
||
model.AddServiceNode(in.Session(), sid.ID, sid.Name, "local")
|
||
util.InfoF("[SendServiceIdentifyACK_local][%v]->[%v] id=%v", service.GetLocalServiceID(), sid.ID, in.Session().ID())
|
||
} else {
|
||
util.InfoF("connector not exist sid")
|
||
}
|
||
case *rocommon.SessionClosed:
|
||
closeSID := model.RemoveServiceNode(in.Session())
|
||
if closeSID != "" {
|
||
msg.CloseSId = closeSID
|
||
}
|
||
util.InfoF("[ServerTCPEventHook::InEvent] Readmsg error SessionClosed session=%v", in.Session().ID())
|
||
case *rocommon.SessionConnectError:
|
||
util.InfoF("[ServerTCPEventHook::InEvent] connector error=%v", msg.String())
|
||
}
|
||
return in
|
||
}
|
||
|
||
func (this *ServerTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
|
||
return out
|
||
}
|
||
|
||
// /////////////////////////////////////////BackendTCPEventHook
|
||
type BackendTCPEventHook struct {
|
||
selectRouterIdx int
|
||
}
|
||
|
||
// def.go EventHook interface
|
||
// 后端服务器接收到来自gate/db/auth的消息
|
||
func (this *BackendTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
|
||
switch inMsg := in.Msg().(type) {
|
||
case *serverproto.GateTransmitAck:
|
||
userMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
|
||
if err != nil {
|
||
util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
|
||
return nil
|
||
}
|
||
//if inMsg.MsgId == 1173 {
|
||
// util.DebugF("kvtime recv cli=%v deltime=%v", inMsg.ClientId, util.GetTimeMilliseconds()-inMsg.KvTime)
|
||
//}
|
||
//封装成来自gate的消息事件
|
||
//todo...这边需要添加gate和game的连接信息,否则game工作线程获取session时会有多线程冲突
|
||
return &model.RecvGateMsgEvent{
|
||
Sess: in.Session(),
|
||
Message: userMsg,
|
||
ClientID: inMsg.ClientId,
|
||
MsgSeqId: inMsg.SeqId,
|
||
KvTime: inMsg.KvTime,
|
||
}
|
||
case *serverproto.ServiceTransmitAck:
|
||
//log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
|
||
transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
|
||
if err != nil {
|
||
util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
|
||
return nil
|
||
}
|
||
//cross begin
|
||
//判断是否是跨服操作(针对于social服务器)
|
||
routeRule := router.GetRuleByMsgID(int(inMsg.MsgId))
|
||
if routeRule != nil {
|
||
var serviceNodeList []string
|
||
switch routeRule.CrossMode {
|
||
case router.CrossMode_Type_Section:
|
||
//转发给router服务器
|
||
serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_CROSSROUTER_STR)
|
||
case router.CrossMode_Type_Global:
|
||
serviceNodeList = model.GetAllServiceNodeByName(model.SERVICE_NODE_TYPE_GLOBALCROSSROUTER_STR)
|
||
}
|
||
if len(serviceNodeList) > 0 {
|
||
this.selectRouterIdx++
|
||
if this.selectRouterIdx >= math.MaxInt32 {
|
||
this.selectRouterIdx = 0
|
||
}
|
||
selectIdx := this.selectRouterIdx % len(serviceNodeList)
|
||
serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
|
||
if serviceNode == nil {
|
||
return in
|
||
}
|
||
crossMsg := &serverproto.ServiceTransmitRouterNtf{
|
||
FromZone: int32(service.GetServiceConfig().Node.Zone),
|
||
MsgId: inMsg.MsgId,
|
||
MsgData: inMsg.MsgData,
|
||
ClientId: inMsg.ClientId,
|
||
TargetServiceNode: inMsg.TargetServiceNode,
|
||
}
|
||
serviceNode.Send(crossMsg)
|
||
return in
|
||
} else if routeRule.CrossMode > 0 {
|
||
util.FatalF("CrossNode Not Find corssMode=%v msgId=%v id=%v", routeRule.CrossMode, inMsg.MsgId, inMsg.ClientId)
|
||
return nil
|
||
}
|
||
}
|
||
//cross end
|
||
|
||
ctx := model.Session2Context(in.Session())
|
||
if ctx != nil {
|
||
return &model.RecvServiceMsgEvent{
|
||
Sess: in.Session(),
|
||
Message: transmitMsg,
|
||
ClientID: inMsg.ClientId,
|
||
ClientIDList: inMsg.ClientIdList,
|
||
ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
|
||
IsMaster: inMsg.IsMaster,
|
||
}
|
||
} else {
|
||
return &model.RecvServiceMsgEvent{
|
||
Sess: in.Session(),
|
||
Message: transmitMsg,
|
||
ClientID: inMsg.ClientId,
|
||
ClientIDList: inMsg.ClientIdList,
|
||
//ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
|
||
IsMaster: inMsg.IsMaster,
|
||
}
|
||
}
|
||
|
||
case *serverproto.ServiceTransmitRouterNtf:
|
||
//social不需要特殊处理的协议通过RecvRouterServiceMsgEvent,透传到上层进行转发
|
||
//log.Println("[BackendTCPEventHook::InEvent] DBTransmitAck db to game", inMsg)
|
||
transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
|
||
if err != nil {
|
||
util.WarnF("[BackendTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
|
||
return nil
|
||
}
|
||
ctx := model.Session2Context(in.Session())
|
||
return &model.RecvRouterServiceMsgEvent{
|
||
Sess: in.Session(),
|
||
Message: transmitMsg,
|
||
ClientID: inMsg.ClientId,
|
||
ClientIDList: inMsg.ClientIdList,
|
||
ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db/social的连接)
|
||
IsMaster: inMsg.IsMaster,
|
||
FromZone: inMsg.FromZone,
|
||
}
|
||
|
||
case *serverproto.ClientClosedACK:
|
||
//todo...
|
||
// 客户端关闭做处理,game做离线处理 放到default中处理
|
||
//log.Println("ClientClosedACK", inMsg)
|
||
default:
|
||
return in
|
||
}
|
||
return in
|
||
}
|
||
|
||
// 后端服务器发送到gate/db的消息
|
||
func (this *BackendTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
|
||
//todo...
|
||
switch out.Msg().(type) {
|
||
case *serverproto.ServiceTransmitAck:
|
||
//log.Println("[BackendTCPEventHook::OutEvent] ServiceTransmitAck game to gate/db", outMsg)
|
||
}
|
||
return out
|
||
}
|
||
|
||
// /////////////////////////////////////////BackendTCPEventHook
|
||
// 跨服router节点处理
|
||
// 收到social节点的消息,或者收到跨服功能节点的消息
|
||
type BackendTCPEventForCrossRouterHook struct {
|
||
selectRouterIdx int
|
||
}
|
||
|
||
func (this *BackendTCPEventForCrossRouterHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
|
||
switch inMsg := in.Msg().(type) {
|
||
case *serverproto.ServiceTransmitRouterNtf:
|
||
msgId := int(inMsg.MsgId)
|
||
routeRule := router.GetRuleByMsgID(msgId)
|
||
var serviceNodeList []string
|
||
if routeRule == nil {
|
||
//router 返回给所有social服务器
|
||
if inMsg.FromZone <= 0 {
|
||
//发给所有social节点
|
||
util.InfoF("BackendTCPEventForCrossRouterHook fromzone<=0 msgid=%v to all zone", msgId)
|
||
serviceNodeList = model.GetAllZoneSocialServiceNode(model.SERVICE_NODE_TYPE_SOCIAL_STR)
|
||
for idx := 0; idx < len(serviceNodeList); idx++ {
|
||
serviceNode := model.GetServiceNode(serviceNodeList[idx])
|
||
if serviceNode == nil {
|
||
continue
|
||
}
|
||
serviceNode.Send(inMsg)
|
||
}
|
||
return in
|
||
}
|
||
//router 返回给对应zone服务器
|
||
serviceNodeList = model.GetAllSocialServiceNodeByZone(int(inMsg.FromZone), model.SERVICE_NODE_TYPE_SOCIAL_STR)
|
||
} else {
|
||
//表示发往router并需要router转发给其他功能服务器
|
||
//通过proto中的RouteRule来确定当前协议的功能服务器
|
||
if inMsg.TargetServiceNode != "" {
|
||
//发送到目地节点服务器
|
||
serviceNode := model.GetServiceNode(inMsg.TargetServiceNode)
|
||
if serviceNode != nil {
|
||
serviceNode.Send(inMsg)
|
||
}
|
||
return in
|
||
} else {
|
||
serviceNodeList = model.GetAllServiceNodeByName(routeRule.Mod)
|
||
}
|
||
}
|
||
if len(serviceNodeList) <= 0 {
|
||
util.ErrorF("BackendTCPEventForCrossRouterHook service node not exist nodename=%v msgid=%v fromZone=%V", routeRule, msgId, inMsg.FromZone)
|
||
return in
|
||
}
|
||
this.selectRouterIdx++
|
||
if this.selectRouterIdx >= math.MaxInt32 {
|
||
this.selectRouterIdx = 0
|
||
}
|
||
selectIdx := this.selectRouterIdx % len(serviceNodeList)
|
||
serviceNode := model.GetServiceNode(serviceNodeList[selectIdx])
|
||
if serviceNode == nil {
|
||
return in
|
||
}
|
||
serviceNode.Send(inMsg)
|
||
case *serverproto.ServiceTransmitAck:
|
||
transmitMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
|
||
if err != nil {
|
||
util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
|
||
return nil
|
||
}
|
||
|
||
//chybenchmark
|
||
//this.kvTimeMsgLog(int32(inMsg.MsgId))
|
||
|
||
//log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
|
||
ctx := model.Session2Context(in.Session())
|
||
return &model.RecvServiceMsgEvent{
|
||
Sess: in.Session(),
|
||
Message: transmitMsg,
|
||
ClientID: inMsg.ClientId,
|
||
ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
|
||
}
|
||
}
|
||
return in
|
||
}
|
||
|
||
func (this *BackendTCPEventForCrossRouterHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
|
||
return out
|
||
}
|
||
|
||
// /////////////////////////////////////////DBTCPEventHook
|
||
// 处理game和db之间的消息
|
||
type ServiceTCPEventHook struct {
|
||
kvTimeMsgNumList []serverproto.KeyValueType
|
||
CurTime uint64
|
||
}
|
||
|
||
func (this *ServiceTCPEventHook) kvTimeMsgLog(msgId int32) {
|
||
bChange := false
|
||
for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
|
||
if this.kvTimeMsgNumList[idx].Key == msgId {
|
||
this.kvTimeMsgNumList[idx].Value++
|
||
bChange = true
|
||
break
|
||
}
|
||
}
|
||
if !bChange {
|
||
this.kvTimeMsgNumList = append(this.kvTimeMsgNumList,
|
||
serverproto.KeyValueType{Key: msgId, Value: 1})
|
||
}
|
||
nowTime := util.GetTimeMilliseconds()
|
||
if this.CurTime <= 0 {
|
||
this.CurTime = nowTime
|
||
} else if nowTime-this.CurTime > 1000 {
|
||
this.CurTime = nowTime
|
||
printfListStr := ""
|
||
for idx := 0; idx < len(this.kvTimeMsgNumList); idx++ {
|
||
printfListStr += "\n" +
|
||
strconv.Itoa(int(this.kvTimeMsgNumList[idx].Key)) + "-" +
|
||
strconv.Itoa(int(this.kvTimeMsgNumList[idx].Value))
|
||
}
|
||
util.DebugF("printfListStr=%v", printfListStr)
|
||
}
|
||
}
|
||
|
||
// db接收来自其他服务器的消息
|
||
func (this *ServiceTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
|
||
switch inMsg := in.Msg().(type) {
|
||
case *serverproto.ServiceTransmitAck:
|
||
dbMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
|
||
if err != nil {
|
||
util.WarnF("[DBTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
|
||
return nil
|
||
}
|
||
|
||
//chybenchmark
|
||
//this.kvTimeMsgLog(int32(inMsg.MsgId))
|
||
|
||
//log.Println("[DBTCPEventHook::InEvent] DBTransmitAck", inMsg)
|
||
ctx := model.Session2Context(in.Session())
|
||
return &model.RecvServiceMsgEvent{
|
||
Sess: in.Session(),
|
||
Message: dbMsg,
|
||
ClientID: inMsg.ClientId,
|
||
ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
|
||
}
|
||
}
|
||
return in
|
||
}
|
||
|
||
// db发送到其他服务器的消息
|
||
func (this *ServiceTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
|
||
//todo...
|
||
switch out.Msg().(type) {
|
||
case *serverproto.ServiceTransmitAck:
|
||
//log.Println("[DBTCPEventHook::OutEvent] DBTransmitAck db to game...", outMsg)
|
||
}
|
||
return out
|
||
}
|
||
|
||
/*
|
||
///////////////////////////////////////////AuthTCPEventHook
|
||
//处理auth和其他服务器之间的消息
|
||
type AuthTCPEventHook struct{
|
||
}
|
||
//auth接收到来自其他服务器的消息
|
||
func (this *AuthTCPEventHook) InEvent(in rocommon.ProcEvent) rocommon.ProcEvent {
|
||
switch inMsg := in.Msg().(type) {
|
||
case *serverproto.ServiceTransmitAck:
|
||
gateMsg, _, err := rpc.DecodeMessage(int(inMsg.MsgId), inMsg.MsgData)
|
||
if err != nil {
|
||
util.WarnF("[AuthTCPEventHook::InEvent] msg decode err:%v msgId:%v", err.Error(), inMsg.MsgId)
|
||
return nil
|
||
}
|
||
ctx := model.Session2Context(in.Session())
|
||
return &model.RecvServiceMsgEvent{
|
||
Sess: in.Session(),
|
||
Message: gateMsg,
|
||
ClientID: inMsg.ClientId,
|
||
ServiceID: ctx.ID, //当前session对应的服务器节点信息(game和db的连接)
|
||
}
|
||
}
|
||
return in
|
||
}
|
||
//auth发送给其他服务器的消息
|
||
func (this *AuthTCPEventHook) OutEvent(out rocommon.ProcEvent) rocommon.ProcEvent {
|
||
//todo...
|
||
return out
|
||
}
|
||
*/
|