本文是Android平台使用WebSocket的实践指南。WebSocket是一种支持全双工、低延迟实时通信的协议,广泛应用于即时通讯、在线游戏等场景。在Android开发中,推荐使用OkHttp库实现,它提供了完整的连接管理、心跳保活和异步回调机制。关键实践包括:实现指数退避的智能重连策略以应对网络波动,结合前台Service管理后台连接生命周期,使用响应式数据流(如Flow)处理消息以实现UI层解耦。此外,采用自定义子协议进行身份验证与消息路由,并对跨国网络进行TCP参数调优,是构建稳定、高效实时通信功能的核心要点。

博主博客

一、WebSocket 概述

1.1 什么是 WebSocket?

WebSocket 是一种建立在 TCP 连接基础上的全双工通信协议,具有以下核心特点:

  • 基于 TCP:在传输层上提供可靠连接
  • 全双工通信:客户端和服务端可以同时进行双向数据传输
  • 持久连接:一次握手,持久连接,避免 HTTP 的重复连接开销
  • 低延迟:适用于实时通信场景

1.2 全双工通信

全双工通信指通信双方可以同时、双向发送和接收数据,类比于电话通话,区别于对讲机式的半双工通信。

1.3 应用场景

  • 即时通讯(聊天应用)
  • 直播弹幕
  • 实时数据监控(股票行情、物联网数据)
  • 在线游戏
  • 协同编辑工具

1.4 相关链接

二、WebSocket 协议详解

2.1 协议握手过程

WebSocket 通过 HTTP 协议升级完成握手:

客户端请求

GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13

服务端响应

HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=

Sec-WebSocket-Accept 计算

Sec-WebSocket-Accept = base64(sha1(Sec-WebSocket-Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))

2.2 数据帧结构

 0                   1                   2                   3
 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len |    Extended payload length    |
|I|S|S|S|  (4)  |A|     (7)     |             (16/64)           |
|N|V|V|V|       |S|             |   (if payload len==126/127)   |
| |1|2|3|       |K|             |                               |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
|     Extended payload length continued, if payload len == 127  |
+ - - - - - - - - - - - - - - - +-------------------------------+
|                               |Masking-key, if MASK set to 1  |
+-------------------------------+-------------------------------+
| Masking-key (continued)       |          Payload Data         |
+-------------------------------- - - - - - - - - - - - - - - - +
:                     Payload Data continued ...                :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
|                     Payload Data continued ...                |
+---------------------------------------------------------------+

各字段说明:

FIN (1位)

  • 1:当前帧是消息的最后一帧
  • 0:消息还有后续帧

RSV1, RSV2, RSV3 (各1位)

  • 保留位,用于协议扩展
  • 必须为0,除非双方协商了扩展

Opcode (4位)
控制帧:

  • 0x8:连接关闭
  • 0x9:Ping 帧(心跳检测)
  • 0xA:Pong 帧(心跳响应)

数据帧:

  • 0x0:连续帧(分片)
  • 0x1:文本帧
  • 0x2:二进制帧
  • 0x3-0x7:保留(非控制帧)
  • 0xB-0xF:保留(控制帧)

MASK (1位)

  • 1:Payload Data 经过掩码处理(客户端→服务端)
  • 0:未经过掩码处理(服务端→客户端)

Payload Length (7/7+16/7+64位)

  • 0-125:实际数据长度
  • 126:后续2字节表示长度
  • 127:后续8字节表示长度

Masking-Key (0或4字节)

  • MASK=1 时存在,用于数据解密

Payload Data

  • 实际传输的数据

2.3 掩码处理算法

void mask(byte[] original, byte[] maskKey) {
    for (int i = 0; i < original.length; i++) {
        original[i] = (byte) (original[i] ^ maskKey[i % 4]);
    }
}

2.4 消息分片

长消息需要分片传输:

  • 第一帧:FIN=0, Opcode≠0
  • 中间帧:FIN=0, Opcode=0
  • 最后一帧:FIN=1, Opcode=0

三、Android 客户端实现

3.1 使用 OkHttp(推荐)

添加依赖

// 最新版本依赖(2024年推荐)
dependencies {
    implementation 'com.squareup.okhttp3:okhttp:4.12.0'
    implementation 'com.squareup.okhttp3:mockwebserver:4.12.0' // 测试用
}

注意:OkHttp 从 3.5.0 开始将 WebSocket 功能集成到核心库,无需额外依赖

基本使用示例

// Kotlin 版本(推荐)
class WebSocketManager {
    private var webSocket: WebSocket? = null
    private val client = OkHttpClient.Builder()
        .pingInterval(30, TimeUnit.SECONDS) // 设置心跳间隔
        .build()
    
    fun connect(url: String) {
        val request = Request.Builder()
            .url(url)
            .build()
        
        val listener = object : WebSocketListener() {
            override fun onOpen(webSocket: WebSocket, response: Response) {
                [email protected] = webSocket
                Log.d("WebSocket", "连接已建立")
                
                // 发送初始消息
                webSocket.send("Hello Server")
            }
            
            override fun onMessage(webSocket: WebSocket, text: String) {
                Log.d("WebSocket", "收到文本消息: $text")
                // 处理文本消息
            }
            
            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                Log.d("WebSocket", "收到二进制消息: ${bytes.size()} bytes")
                // 处理二进制消息(如图片、音频)
            }
            
            override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
                Log.d("WebSocket", "连接关闭中: $code - $reason")
            }
            
            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                Log.d("WebSocket", "连接已关闭: $code - $reason")
            }
            
            override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
                Log.e("WebSocket", "连接失败", t)
                // 实现重连逻辑
            }
        }
        
        client.newWebSocket(request, listener)
    }
    
    fun sendMessage(message: String) {
        webSocket?.send(message)
    }
    
    fun sendBinary(data: ByteArray) {
        webSocket?.send(ByteString.of(*data))
    }
    
    fun close() {
        webSocket?.close(1000, "正常关闭")
    }
}

重连机制实现

class ReconnectingWebSocket(
    private val url: String,
    private val client: OkHttpClient
) {
    private var webSocket: WebSocket? = null
    private var isConnected = false
    private var reconnectAttempts = 0
    private val maxReconnectAttempts = 5
    private val reconnectDelay = 2000L
    
    fun connect() {
        val request = Request.Builder().url(url).build()
        
        val listener = object : WebSocketListener() {
            override fun onOpen(webSocket: WebSocket, response: Response) {
                isConnected = true
                reconnectAttempts = 0
                Log.d("WebSocket", "连接成功")
            }
            
            override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
                isConnected = false
                webSocket.cancel()
                
                if (reconnectAttempts < maxReconnectAttempts) {
                    reconnectAttempts++
                    val delay = reconnectDelay * reconnectAttempts
                    
                    Handler(Looper.getMainLooper()).postDelayed({
                        Log.d("WebSocket", "第${reconnectAttempts}次重连...")
                        connect()
                    }, delay)
                }
            }
            
            override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
                isConnected = false
            }
        }
        
        webSocket = client.newWebSocket(request, listener)
    }
}

3.2 使用 Java-WebSocket 库

添加依赖

dependencies {
    implementation 'org.java-websocket:Java-WebSocket:1.5.4'
}

基本使用

public class MyWebSocketClient extends WebSocketClient {
    
    public MyWebSocketClient(URI serverUri) {
        super(serverUri);
    }
    
    @Override
    public void onOpen(ServerHandshake handshakedata) {
        Log.d("WebSocket", "连接已打开");
        send("Hello Server");
    }
    
    @Override
    public void onMessage(String message) {
        Log.d("WebSocket", "收到消息: " + message);
    }
    
    @Override
    public void onMessage(ByteBuffer bytes) {
        Log.d("WebSocket", "收到二进制消息");
    }
    
    @Override
    public void onClose(int code, String reason, boolean remote) {
        Log.d("WebSocket", "连接关闭: " + reason);
    }
    
    @Override
    public void onError(Exception ex) {
        Log.e("WebSocket", "错误", ex);
    }
}

// 使用示例
try {
    MyWebSocketClient client = new MyWebSocketClient(
        new URI("ws://server.address.com:8080")
    );
    client.connect();
} catch (URISyntaxException e) {
    e.printStackTrace();
}

四、服务端模拟与测试

4.1 使用 MockWebServer 测试

class WebSocketTest {
    private val mockWebServer = MockWebServer()
    private val client = OkHttpClient.Builder().build()
    
    @Test
    fun testWebSocketCommunication() {
        // 设置 Mock 服务端
        mockWebServer.enqueue(
            MockResponse()
                .withWebSocketUpgrade(object : WebSocketListener() {
                    override fun onOpen(webSocket: WebSocket, response: Response) {
                        // 服务端收到连接
                        webSocket.send("Welcome!")
                    }
                    
                    override fun onMessage(webSocket: WebSocket, text: String) {
                        // 回显客户端消息
                        webSocket.send("Echo: $text")
                    }
                })
        )
        
        // 启动服务端
        mockWebServer.start()
        val url = "ws://${mockWebServer.hostName}:${mockWebServer.port}/"
        
        // 创建客户端连接
        val request = Request.Builder().url(url).build()
        val latch = CountDownLatch(1)
        val receivedMessages = mutableListOf<String>()
        
        val listener = object : WebSocketListener() {
            override fun onOpen(webSocket: WebSocket, response: Response) {
                webSocket.send("Hello")
            }
            
            override fun onMessage(webSocket: WebSocket, text: String) {
                receivedMessages.add(text)
                if (receivedMessages.size >= 2) {
                    latch.countDown()
                }
            }
        }
        
        client.newWebSocket(request, listener)
        
        // 等待消息接收完成
        latch.await(5, TimeUnit.SECONDS)
        
        // 验证结果
        assert(receivedMessages.contains("Welcome!"))
        assert(receivedMessages.contains("Echo: Hello"))
        
        // 清理
        mockWebServer.shutdown()
    }
}

4.2 完整测试流程示例

测试场景:完整的握手、消息交换、心跳、关闭流程

@Test
fun testCompleteWebSocketFlow() {
    val mockWebServer = MockWebServer()
    val executor = Executors.newSingleThreadExecutor()
    
    mockWebServer.enqueue(
        MockResponse().withWebSocketUpgrade(object : WebSocketListener() {
            private lateinit var webSocket: WebSocket
            
            override fun onOpen(webSocket: WebSocket, response: Response) {
                this.webSocket = webSocket
                Log.d("Test", "Server: 连接已建立")
            }
            
            override fun onMessage(webSocket: WebSocket, text: String) {
                Log.d("Test", "Server: 收到消息: $text")
                
                when (text) {
                    "command 1" -> {
                        executor.execute {
                            webSocket.send("replay command 1")
                        }
                    }
                    "command 2" -> {
                        executor.execute {
                            webSocket.send("ping")
                        }
                    }
                }
            }
            
            override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
                // 处理二进制消息
            }
            
            override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
                webSocket.close(code, reason)
            }
        })
    )
    
    mockWebServer.start()
    
    // ... 客户端测试代码
}

五、OkHttp WebSocket 源码解析

5.1 核心接口

WebSocket 接口

public interface WebSocket {
    // 发送文本消息
    boolean send(String text);
    
    // 发送二进制消息
    boolean send(ByteString bytes);
    
    // 关闭连接
    boolean close(int code, String reason);
    
    // 取消连接
    void cancel();
}

WebSocketListener 接口

public abstract class WebSocketListener {
    // 连接建立
    public void onOpen(WebSocket webSocket, Response response) {}
    
    // 收到文本消息
    public void onMessage(WebSocket webSocket, String text) {}
    
    // 收到二进制消息
    public void onMessage(WebSocket webSocket, ByteString bytes) {}
    
    // 连接关闭
    public void onClosing(WebSocket webSocket, int code, String reason) {}
    
    // 连接已关闭
    public void onClosed(WebSocket webSocket, int code, String reason) {}
    
    // 连接失败
    public void onFailure(WebSocket webSocket, Throwable t, Response response) {}
}

5.2 连接建立过程

// RealWebSocket.kt 中的关键代码
internal fun connect(client: OkHttpClient, request: Request) {
    // 1. 构造升级请求
    val webSocketRequest = request.newBuilder()
        .header("Upgrade", "websocket")
        .header("Connection", "Upgrade")
        .header("Sec-WebSocket-Key", key)
        .header("Sec-WebSocket-Version", "13")
        .build()
    
    // 2. 发送 HTTP 请求
    val call = client.newCall(webSocketRequest)
    call.enqueue(object : Callback {
        override fun onResponse(call: Call, response: Response) {
            // 3. 验证响应
            if (response.code != 101) {
                // 协议升级失败
                failWebSocket(response, ProtocolException(...))
                return
            }
            
            // 4. 验证 Sec-WebSocket-Accept
            val acceptHeader = response.header("Sec-WebSocket-Accept")
            val expectedAccept = ...
            if (acceptHeader != expectedAccept) {
                failWebSocket(response, ProtocolException(...))
                return
            }
            
            // 5. 创建 WebSocket 实例并开始读取
            initReaderAndWriter(response)
        }
    })
}

5.3 消息读取循环

internal fun readLoop() {
    try {
        while (!closed) {
            // 读取一帧数据
            val frame = readFrame()
            
            when (frame.opcode) {
                OPCODE_TEXT -> listener.onMessage(this, frame.data.utf8())
                OPCODE_BINARY -> listener.onMessage(this, frame.data)
                OPCODE_CLOSE -> processCloseFrame(frame)
                OPCODE_PING -> processPingFrame(frame)
                OPCODE_PONG -> processPongFrame(frame)
            }
        }
    } catch (e: Exception) {
        // 处理异常
    }
}

六、最佳实践与注意事项

6.1 线程安全

  • WebSocket 回调可能在非主线程执行
  • UI 更新需要切换到主线程
  • 使用 runOnUiThreadHandler
webSocketListener = object : WebSocketListener() {
    override fun onMessage(webSocket: WebSocket, text: String) {
        runOnUiThread {
            // 更新 UI
            textView.text = text
        }
    }
}

6.2 心跳机制

val client = OkHttpClient.Builder()
    .pingInterval(30, TimeUnit.SECONDS) // 自动心跳
    .build()

6.3 连接状态管理

enum class ConnectionState {
    DISCONNECTED,
    CONNECTING,
    CONNECTED,
    RECONNECTING,
    DISCONNECTING
}

class WebSocketManager {
    private val _connectionState = MutableStateFlow(ConnectionState.DISCONNECTED)
    val connectionState: StateFlow<ConnectionState> = _connectionState
    
    fun connect() {
        _connectionState.value = ConnectionState.CONNECTING
        // ... 连接逻辑
    }
}

6.4 错误处理与重连

private suspend fun connectWithRetry(
    maxRetries: Int = 5,
    initialDelay: Long = 1000
) {
    var retryCount = 0
    var delay = initialDelay
    
    while (retryCount < maxRetries && !isConnected) {
        try {
            connect()
            break
        } catch (e: Exception) {
            retryCount++
            delay *= 2 // 指数退避
            
            delay(delay)
        }
    }
}

6.5 消息队列管理

class MessageQueueManager {
    private val messageQueue = LinkedBlockingQueue<String>()
    private var isProcessing = false
    
    fun sendMessage(message: String) {
        messageQueue.offer(message)
        processQueue()
    }
    
    private fun processQueue() {
        if (isProcessing) return
        
        GlobalScope.launch {
            isProcessing = true
            while (!messageQueue.isEmpty()) {
                val message = messageQueue.poll()
                webSocket?.send(message)
                delay(100) // 控制发送频率
            }
            isProcessing = false
        }
    }
}

七、常见问题与解决方案

7.1 SSL/TLS 证书问题

val client = OkHttpClient.Builder()
    .sslSocketFactory(
        SSLContext.getInstance("TLS").apply {
            init(null, null, null)
        }.socketFactory,
        TrustAllCerts() // 自定义信任管理器(仅测试环境使用)
    )
    .hostnameVerifier { _, _ -> true } // 仅测试环境使用
    .build()

7.2 后台连接保持

// 使用前台服务保持连接
class WebSocketService : Service() {
    private val notificationId = 1
    
    override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
        startForeground(notificationId, createNotification())
        // 建立 WebSocket 连接
        return START_STICKY
    }
    
    private fun createNotification(): Notification {
        return NotificationCompat.Builder(this, "websocket_channel")
            .setContentTitle("WebSocket 服务运行中")
            .setSmallIcon(R.drawable.ic_notification)
            .build()
    }
}

7.3 网络状态监听

class NetworkStateMonitor(context: Context) {
    private val connectivityManager = context.getSystemService(
        Context.CONNECTIVITY_SERVICE
    ) as ConnectivityManager
    
    fun observeNetworkState(): Flow<Boolean> = callbackFlow {
        val callback = object : ConnectivityManager.NetworkCallback() {
            override fun onAvailable(network: Network) {
                trySend(true)
            }
            
            override fun onLost(network: Network) {
                trySend(false)
            }
        }
        
        val request = NetworkRequest.Builder()
            .addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
            .build()
        
        connectivityManager.registerNetworkCallback(request, callback)
        
        awaitClose {
            connectivityManager.unregisterNetworkCallback(callback)
        }
    }
}

八、性能优化建议

8.1 消息压缩

fun compressMessage(message: String): ByteArray {
    val outputStream = ByteArrayOutputStream()
    GZIPOutputStream(outputStream).use {
        it.write(message.toByteArray(Charsets.UTF_8))
    }
    return outputStream.toByteArray()
}

fun decompressMessage(compressed: ByteArray): String {
    ByteArrayInputStream(compressed).use { input ->
        GZIPInputStream(input).use { gzip ->
            return gzip.readBytes().toString(Charsets.UTF_8)
        }
    }
}

8.2 连接池管理

object WebSocketPool {
    private val connections = mutableMapOf<String, WebSocket>()
    private val lock = ReentrantLock()
    
    fun getConnection(url: String): WebSocket? {
        lock.withLock {
            return connections[url]
        }
    }
    
    fun addConnection(url: String, webSocket: WebSocket) {
        lock.withLock {
            connections[url] = webSocket
        }
    }
    
    fun removeConnection(url: String) {
        lock.withLock {
            connections.remove(url)
        }
    }
}

九、总结

WebSocket 在 Android 开发中为实时通信提供了强大的支持。通过合理使用 OkHttp 或 Java-WebSocket 库,结合良好的架构设计,可以构建出稳定、高效的实时通信功能。

关键要点:

  1. 理解协议:掌握 WebSocket 协议细节有助于问题排查
  2. 选择合适库:OkHttp 是官方推荐,功能全面;Java-WebSocket 更轻量
  3. 处理连接状态:实现完整的连接状态管理和重连机制
  4. 注意线程安全:WebSocket 回调在后台线程,UI 更新需切回主线程
  5. 性能优化:合理使用心跳、消息队列、连接池等技术