手撸golang GO与微服务 ChatServer之2


本文摘自网络,作者,侵删。

缘起

最近阅读<<Go微服务实战>> (刘金亮, 2021.1)
本系列笔记拟采用golang练习之

案例需求(聊天服务器)

  • 用户可以连接到服务器。
  • 用户可以设定自己的用户名。
  • 用户可以向服务器发送消息,同时服务器也会向其他用户广播该消息。

目标

  • 实现聊天服务端, 支持端口监听, 多个客户端的连入, 消息收发, 广播, 断开, 并采集日志
  • 改造已有的聊天客户端, 使之能同时适配客户端和服务端的通信, 并设置写缓冲以防止死锁
  • 测试多个客户端的连入, 收发和断开, 并诊断服务端日志

设计

  • IMsg: 定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码
  • IMsgDecoder: 定义消息解码器及其实现
  • IChatClient: 定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.
  • tChatClient: 聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.
  • IChatServer: 定义聊天服务器接口, 为方便测试, 提供日志采集方法
  • tChatServer: 实现聊天服务器IChatServer

单元测试

ChatServer_test.go

package chat_server

import (
    "fmt"
    cs "learning/gooop/chat_server"
    "strings"
    "testing"
    "time"
)

func Test_ChatServer(t *testing.T) {
    fnAssertTrue := func(b bool, msg string) {
        if !b {
            t.Fatal(msg)
        }
    }

    port := 3333
    server := cs.NewChatServer()
    err := server.Open(port)
    if err != nil {
        t.Fatal(err)
    }

    clientCount := 3
    address := fmt.Sprintf("localhost:%v", port)
    for i := 0;i < clientCount;i++ {
        err, client := cs.DialChatClient(address)
        if err != nil {
            t.Fatal(err)
        }

        id := fmt.Sprintf("c%02d", i)
        client.RecvHandler(func(client cs.IChatClient, msg cs.IMsg) {
            t.Logf("%v recv: %v\n", id, msg)
        })

        go func() {
            client.SetName(id)
            client.Send(&cs.NameMsg{id })

            n := 0
            for range time.Tick(time.Duration(1) * time.Second) {
                client.Send(&cs.ChatMsg{id, fmt.Sprintf("msg %02d from %v", n, id) })

                n++
                if n >= 3 {
                    break
                }
            }

            client.Close()
        }()
    }

    passedSeconds := 0
    for range time.Tick(time.Second) {
        passedSeconds++
        t.Logf("%v seconds passed", passedSeconds)

        if passedSeconds >= 5 {
            break
        }
    }
    server.Close()

    logs := server.GetLogs()
    fnHasLog := func(log string) bool {
        for _,it := range logs {
            if strings.Contains(it, log) {
                return true
            }
        }
        return false
    }

    for i := 0;i < clientCount;i++ {
        msg := fmt.Sprintf("tChatServer.handleIncomingConn, clientCount=%v", i + 1)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)

        msg = fmt.Sprintf("tChatServer.handleClientClosed, c%02d", i)
        fnAssertTrue(fnHasLog(msg), "expecting log: " + msg)
    }
}

测试输出

$ go test -v ChatServer_test.go 
=== RUN   Test_ChatServer
tChatServer.handleIncomingConn, clientCount=1
tChatServer.handleIncomingConn, clientCount=2
tChatServer.handleIncomingConn, clientCount=3
    ChatServer_test.go:59: 1 seconds passed
    ChatServer_test.go:35: c00 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c02 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c02 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c01 recv: &{c00 msg 00 from c00}
    ChatServer_test.go:35: c01 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c01 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c00 recv: &{c01 msg 00 from c01}
    ChatServer_test.go:35: c00 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c02 recv: &{c02 msg 00 from c02}
    ChatServer_test.go:35: c00 recv: &{c01 msg 01 from c01}
    ChatServer_test.go:35: c01 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:35: c01 recv: &{c02 msg 01 from c02}
    ChatServer_test.go:35: c02 recv: &{c01 msg 01 from c01}
    ChatServer_test.go:35: c02 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:59: 2 seconds passed
    ChatServer_test.go:35: c00 recv: &{c00 msg 01 from c00}
    ChatServer_test.go:35: c02 recv: &{c02 msg 01 from c02}
    ChatServer_test.go:35: c00 recv: &{c02 msg 01 from c02}
tChatClient.postConnClosed, c00, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=false
tChatClient.postConnClosed, c01, serverFlag=false
tChatClient.postConnClosed, c02, serverFlag=true
tChatServer.handleClientClosed, c02
tChatServer.handleClientClosed, c02, clientCount=2
tChatClient.postConnClosed, c01, serverFlag=true
tChatServer.handleClientClosed, c01
tChatServer.handleClientClosed, c01, clientCount=1
    ChatServer_test.go:59: 3 seconds passed
tChatClient.postConnClosed, c00, serverFlag=true
tChatServer.handleClientClosed, c00
tChatServer.handleClientClosed, c00, clientCount=0
    ChatServer_test.go:59: 4 seconds passed
    ChatServer_test.go:59: 5 seconds passed
--- PASS: Test_ChatServer (5.00s)
PASS
ok      command-line-arguments  5.003s

IMsg.go

定义消息接口, 以及相关消息的实现. 为方便任意消息内容的解码, 消息传输时, 采用base64转码

package chat_server

import (
    "encoding/base64"
    "fmt"
)

type IMsg interface {
    Encode() string
}

type NameMsg struct {
    Name string
}

func (me *NameMsg) Encode() string {
    return fmt.Sprintf("NAME %s\n", base64.StdEncoding.EncodeToString([]byte(me.Name)))
}

type ChatMsg struct {
    Name string
    Words string
}

func (me *ChatMsg) Encode() string {
    return fmt.Sprintf("CHAT %s %s\n",
        base64.StdEncoding.EncodeToString([]byte(me.Name)),
        base64.StdEncoding.EncodeToString([]byte(me.Words)),
    )
}

IMsgDecoder.go

定义消息解码器及其实现

package chat_server

import (
    "encoding/base64"
    "strings"
)


type IMsgDecoder interface {
    Decode(line string) (bool, IMsg)
}

type tMsgDecoder struct {
}

func (me *tMsgDecoder) Decode(line string) (bool, IMsg) {
    items := strings.Split(line, " ")
    size := len(items)

    if items[0] == "NAME" && size == 2 {
        name, err := base64.StdEncoding.DecodeString(items[1])
        if err != nil {
            return false, nil
        }

        return true, &NameMsg{
            Name: string(name),
        }
    }

    if items[0] == "CHAT" && size == 3 {
        name, err := base64.StdEncoding.DecodeString(items[1])
        if err != nil {
            return false, nil
        }

        words, err := base64.StdEncoding.DecodeString(items[2])
        if err != nil {
            return false, nil
        }

        return true, &ChatMsg{
            Name: string(name),
            Words: string(words),
        }
    }

    return false, nil
}


var MsgDecoder = &tMsgDecoder{}

IChatClient.go

定义聊天客户端接口. 本次添加关闭通知方法, 以适配服务端.

package chat_server

type IChatClient interface {
    GetName() string
    SetName(name string)

    Send(msg IMsg)
    RecvHandler(handler ClientRecvFunc)
    CloseHandler(handler ClientCloseFunc)

    Close()
}

type ClientRecvFunc func(client IChatClient, msg IMsg)
type ClientCloseFunc func(client IChatClient)

tChatClient.go

聊天客户端, 实现IChatClient接口. 本次添加关闭通知, 写缓冲和读超时控制, 修复写循环细节问题.

package chat_server

import (
    "bufio"
    "fmt"
    "io"
    "net"
    "sync/atomic"
    "time"
)

type tChatClient struct {
    conn net.Conn
    name string
    openFlag int32
    closeFlag int32
    serverFlag bool

    closeChan chan bool
    sendChan chan IMsg

    sendLogs []IMsg
    dropLogs []IMsg
    recvLogs []IMsg
    pendingSend int32

    recvHandler ClientRecvFunc
    closeHandler ClientCloseFunc
}

var gMaxPendingSend int32 = 100

func DialChatClient(address string) (error, IChatClient) {
    conn, err := net.Dial("tcp", address)
    if err != nil {
        return err, nil
    }

    return nil, openChatClient(conn, false)
}

func openChatClient(conn net.Conn, serverFlag bool) IChatClient {
    it := &tChatClient{
        conn: conn,
        openFlag: 0,
        closeFlag: 0,
        serverFlag: serverFlag,

        closeChan: make(chan bool),
        sendChan: make(chan IMsg, gMaxPendingSend),

        name: "anonymous",
        sendLogs: []IMsg{},
        dropLogs: []IMsg{},
        recvLogs: []IMsg{},
    }
    it.open()
    return it
}


func (me *tChatClient) GetName() string {
    return me.name
}

func (me *tChatClient) SetName(name string) {
    me.name = name
}

func (me *tChatClient) open(){
    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
        return
    }

    go me.beginWrite()
    go me.beginRead()
}


func (me *tChatClient) isClosed() bool {
    return me.closeFlag != 0
}

func (me *tChatClient) isNotClosed() bool {
    return !me.isClosed()
}

func (me *tChatClient) Send(msg IMsg) {
    if me.isClosed() {
        return
    }

    if me.pendingSend < gMaxPendingSend {
        atomic.AddInt32(&me.pendingSend, 1)
        me.sendChan <- msg

    } else {
        me.dropLogs = append(me.dropLogs, msg)
    }
}

func (me *tChatClient) RecvHandler(handler ClientRecvFunc) {
    if me.isNotClosed() {
        me.recvHandler = handler
    }
}


func (me *tChatClient) CloseHandler(handler ClientCloseFunc) {
    if me.isNotClosed() {
        me.closeHandler = handler
    }
}


func (me *tChatClient) Close() {
    if me.isNotClosed() {
        me.closeConn()
    }
}

func (me *tChatClient) beginWrite() {
    writer := io.Writer(me.conn)
    for {
        select {
        case <- me.closeChan:
            _ = me.conn.Close()
            me.closeFlag = 2
            me.postConnClosed()
            return

        case msg := <- me.sendChan:
            atomic.AddInt32(&me.pendingSend, -1)
            _,e := writer.Write([]byte(msg.Encode()))
            if e != nil {
                me.closeConn()
                break
            } else {
                me.sendLogs = append(me.sendLogs, msg)
            }

        case <- time.After(time.Duration(10) * time.Second):
            me.postRecvTimeout()
            break
        }
    }
}

func (me *tChatClient) postRecvTimeout() {
    fmt.Printf("tChatClient.postRecvTimeout, %v, serverFlag=%v\n", me.name, me.serverFlag)
    me.closeConn()
}

func (me *tChatClient) beginRead() {
    reader := bufio.NewReader(me.conn)
    for {
        line, err := reader.ReadString('\n')
        if err != nil {
            me.closeConn()
            break
        }

        ok, msg := MsgDecoder.Decode(line)
        if ok {
            fn := me.recvHandler
            if fn != nil {
                fn(me, msg)
            }

            me.recvLogs = append(me.recvLogs, msg)
        }
    }
}

func (me *tChatClient) closeConn() {
    if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
        return
    }
    me.closeChan <- true
}

func (me *tChatClient) postConnClosed() {
    fmt.Printf("tChatClient.postConnClosed, %v, serverFlag=%v\n", me.name, me.serverFlag)

    handler := me.closeHandler
    if handler != nil {
        handler(me)
    }

    me.closeHandler = nil
    me.recvHandler = nil
}

IChatServer.go

定义聊天服务器接口, 为方便测试, 提供日志采集方法

package chat_server

type IChatServer interface {
    Open(port int) error
    Broadcast(msg IMsg)
    Close()
    GetLogs() []string
}

tChatServer.go

实现聊天服务器IChatServer

package chat_server

import (
    "errors"
    "fmt"
    "net"
    "sync"
    "sync/atomic"
)

type tChatServer struct {
    openFlag int32
    closeFlag int32

    clients []IChatClient
    clientCount int
    clientLock *sync.RWMutex

    listener net.Listener
    recvLogs []IMsg

    logs []string
}

func NewChatServer() IChatServer {
    it := &tChatServer{
        openFlag: 0,
        closeFlag: 0,

        clients: []IChatClient{},
        clientCount: 0,
        clientLock: new(sync.RWMutex),

        listener: nil,
        recvLogs: []IMsg{},
    }
    return it
}

func (me *tChatServer) Open(port int) error {
    if !atomic.CompareAndSwapInt32(&me.openFlag, 0, 1) {
        return errors.New("server already opened")
    }

    listener, err := net.Listen("tcp", fmt.Sprintf(":%v", port))
    if err != nil {
        return err
    }

    me.listener = listener
    go me.beginListening()
    return nil
}

func (me *tChatServer) logf(f string, args... interface{}) {
    msg := fmt.Sprintf(f, args...)
    me.logs = append(me.logs, msg)
    fmt.Println(msg)
}

func (me *tChatServer) GetLogs() []string {
    return me.logs
}

func (me *tChatServer) isClosed() bool {
    return me.closeFlag != 0
}

func (me *tChatServer) isNotClosed() bool {
    return !me.isClosed()
}

func (me *tChatServer) beginListening() {
    for !me.isClosed() {
        conn, err := me.listener.Accept()
        if err != nil {
            me.Close()
            break
        }

        me.handleIncomingConn(conn)
    }
}


func (me *tChatServer) Close() {
    if !atomic.CompareAndSwapInt32(&me.closeFlag, 0, 1) {
        return
    }

    _ = me.listener.Close()
    me.closeAllClients()
}

func (me *tChatServer) closeAllClients() {
    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    for i,it := range me.clients {
        if it != nil {
            it.Close()
            me.clients[i] = nil
        }
    }
    me.clientCount = 0
}


func (me *tChatServer) handleIncomingConn(conn net.Conn) {
    // init client
    client := openChatClient(conn, true)
    client.RecvHandler(me.handleClientMsg)
    client.CloseHandler(me.handleClientClosed)

    // lock me.clients
    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    // append to me.clients
    if len(me.clients) > me.clientCount {
        me.clients[me.clientCount] = client
    } else {
        me.clients = append(me.clients, client)
    }
    me.clientCount++

    me.logf("tChatServer.handleIncomingConn, clientCount=%v", me.clientCount)
}

func (me *tChatServer) handleClientMsg(client IChatClient, msg IMsg) {
    me.recvLogs = append(me.recvLogs, msg)

    if nameMsg,ok := msg.(*NameMsg);ok {
        client.SetName(nameMsg.Name)

    } else if _, ok := msg.(*ChatMsg);ok {
        me.Broadcast(msg)
    }
}

func (me *tChatServer) handleClientClosed(client IChatClient) {
    me.logf("tChatServer.handleClientClosed, %s", client.GetName())

    me.clientLock.Lock()
    defer me.clientLock.Unlock()

    if me.clientCount <= 0 {
        return
    }

    lastI := me.clientCount - 1
    for i,it := range me.clients {
        if it == client {
            if i == lastI {
                me.clients[i] = nil
            } else {
                me.clients[i], me.clients[lastI] = me.clients[lastI], nil
            }
            me.clientCount--
            break
        }
    }

    me.logf("tChatServer.handleClientClosed, %s, clientCount=%v", client.GetName(), me.clientCount)
}

func (me *tChatServer) Broadcast(msg IMsg) {
    me.clientLock.RLock()
    defer me.clientLock.RUnlock()

    for _,it := range me.clients {
        if it != nil {
            it.Send(msg)
        }
    }
}

(未完待续)


本文来自:简书

感谢作者:老罗话编程

查看原文:手撸golang GO与微服务 ChatServer之2

相关阅读 >>

归并排序

手撸Golang 基本数据结构与算法 图的最短路径  狄克斯特拉算法

中国身份证号验证库

优雅的实现 Golang rest api 架构

Golang web框架有哪些?

Golang制作简单代理定制

一周 Go world 新鲜事

详解 Golang 通道 chan

jack liu's Golang personal summary notes

Golang 创建型设计模式 原型模式

更多相关阅读请进入《Go》频道 >>




打赏

取消

感谢您的支持,我会继续努力的!

扫码支持
扫码打赏,您说多少就多少

打开支付宝扫一扫,即可进行扫码打赏哦

分享从这里开始,精彩与您同在

评论

管理员已关闭评论功能...