新聞中心
本文是該系列的第五篇。

創(chuàng)新互聯(lián)專注于企業(yè)成都全網(wǎng)營銷推廣、網(wǎng)站重做改版、甘南網(wǎng)站定制設(shè)計(jì)、自適應(yīng)品牌網(wǎng)站建設(shè)、html5、商城系統(tǒng)網(wǎng)站開發(fā)、集團(tuán)公司官網(wǎng)建設(shè)、成都外貿(mào)網(wǎng)站建設(shè)、高端網(wǎng)站制作、響應(yīng)式網(wǎng)頁設(shè)計(jì)等建站業(yè)務(wù),價(jià)格優(yōu)惠性價(jià)比高,為甘南等各大城市提供網(wǎng)站開發(fā)制作服務(wù)。
- 第一篇: 模式
- 第二篇: OAuth
- 第三篇: 對(duì)話
- 第四篇: 消息
對(duì)于實(shí)時(shí)消息,我們將使用 服務(wù)器發(fā)送事件Server-Sent Events。這是一個(gè)打開的連接,我們可以在其中傳輸數(shù)據(jù)流。我們會(huì)有個(gè)端點(diǎn),用戶會(huì)在其中訂閱發(fā)送給他的所有消息。
消息戶端
在 HTTP 部分之前,讓我們先編寫一個(gè)映射map ,讓所有客戶端都監(jiān)聽消息。 像這樣全局初始化:
- type MessageClient struct {
- Messages chan Message
- UserID string
- }
- var messageClients sync.Map
已創(chuàng)建的新消息
還記得在 上一篇文章 中,當(dāng)我們創(chuàng)建這條消息時(shí),我們留下了一個(gè) “TODO” 注釋。在那里,我們將使用這個(gè)函數(shù)來調(diào)度一個(gè) goroutine。
- go messageCreated(message)
把這行代碼插入到我們留注釋的位置。
- func messageCreated(message Message) error {
- if err := db.QueryRow(`
- SELECT user_id FROM participants
- WHERE user_id != $1 and conversation_id = $2
- `, message.UserID, message.ConversationID).
- Scan(&message.ReceiverID); err != nil {
- return err
- }
- go broadcastMessage(message)
- return nil
- }
- func broadcastMessage(message Message) {
- messageClients.Range(func(key, _ interface{}) bool {
- client := key.(*MessageClient)
- if client.UserID == message.ReceiverID {
- client.Messages <- message
- }
- return true
- })
- }
該函數(shù)查詢接收者 ID(其他參與者 ID),并將消息發(fā)送給所有客戶端。
訂閱消息
讓我們轉(zhuǎn)到 main() 函數(shù)并添加以下路由:
- router.HandleFunc("GET", "/api/messages", guard(subscribeToMessages))
此端點(diǎn)處理 /api/messages 上的 GET 請(qǐng)求。請(qǐng)求應(yīng)該是一個(gè) EventSource 連接。它用一個(gè)事件流響應(yīng),其中的數(shù)據(jù)是 JSON 格式的。
- func subscribeToMessages(w http.ResponseWriter, r *http.Request) {
- if a := r.Header.Get("Accept"); !strings.Contains(a, "text/event-stream") {
- http.Error(w, "This endpoint requires an EventSource connection", http.StatusNotAcceptable)
- return
- }
- f, ok := w.(http.Flusher)
- if !ok {
- respondError(w, errors.New("streaming unsupported"))
- return
- }
- ctx := r.Context()
- authUserID := ctx.Value(keyAuthUserID).(string)
- h := w.Header()
- h.Set("Cache-Control", "no-cache")
- h.Set("Connection", "keep-alive")
- h.Set("Content-Type", "text/event-stream")
- messages := make(chan Message)
- defer close(messages)
- client := &MessageClient{Messages: messages, UserID: authUserID}
- messageClients.Store(client, nil)
- defer messageClients.Delete(client)
- for {
- select {
- case <-ctx.Done():
- return
- case message := <-messages:
- if b, err := json.Marshal(message); err != nil {
- log.Printf("could not marshall message: %v\n", err)
- fmt.Fprintf(w, "event: error\ndata: %v\n\n", err)
- } else {
- fmt.Fprintf(w, "data: %s\n\n", b)
- }
- f.Flush()
- }
- }
- }
首先,它檢查請(qǐng)求頭是否正確,并檢查服務(wù)器是否支持流式傳輸。我們創(chuàng)建一個(gè)消息通道,用它來構(gòu)建一個(gè)客戶端,并將其存儲(chǔ)在客戶端映射中。每當(dāng)創(chuàng)建新消息時(shí),它都會(huì)進(jìn)入這個(gè)通道,因此我們可以通過 for-select 循環(huán)從中讀取。
服務(wù)器發(fā)送事件Server-Sent Events使用以下格式發(fā)送數(shù)據(jù):
- data: some data here\n\n
我們以 JSON 格式發(fā)送:
- data: {"foo":"bar"}\n\n
我們使用 fmt.Fprintf() 以這種格式寫入響應(yīng)寫入器writter,并在循環(huán)的每次迭代中刷新數(shù)據(jù)。
這個(gè)循環(huán)會(huì)一直運(yùn)行,直到使用請(qǐng)求上下文關(guān)閉連接為止。我們延遲了通道的關(guān)閉和客戶端的刪除,因此,當(dāng)循環(huán)結(jié)束時(shí),通道將被關(guān)閉,客戶端不會(huì)收到更多的消息。
注意,服務(wù)器發(fā)送事件Server-Sent Events(EventSource)的 JavaScript API 不支持設(shè)置自定義請(qǐng)求頭,所以我們不能設(shè)置 Authorization: Bearer 。這就是為什么 guard() 中間件也會(huì)從 URL 查詢字符串中讀取令牌的原因。
實(shí)時(shí)消息部分到此結(jié)束。我想說的是,這就是后端的全部?jī)?nèi)容。但是為了編寫前端代碼,我將再增加一個(gè)登錄端點(diǎn):一個(gè)僅用于開發(fā)的登錄。
- 源代碼
名稱欄目:構(gòu)建一個(gè)即時(shí)消息應(yīng)用(五):實(shí)時(shí)消息
網(wǎng)站URL:http://m.fisionsoft.com.cn/article/dhjehec.html


咨詢
建站咨詢
