Android 中使用 WebSocket 全面指南
本文是Android平台使用WebSocket的实践指南。WebSocket是一种支持全双工、低延迟实时通信的协议,广泛应用于即时通讯、在线游戏等场景。在Android开发中,推荐使用OkHttp库实现,它提供了完整的连接管理、心跳保活和异步回调机制。关键实践包括:实现指数退避的智能重连策略以应对网络波动,结合前台Service管理后台连接生命周期,使用响应式数据流(如Flow)处理消息以实现UI层解耦。此外,采用自定义子协议进行身份验证与消息路由,并对跨国网络进行TCP参数调优,是构建稳定、高效实时通信功能的核心要点。
博主博客
一、WebSocket 概述
1.1 什么是 WebSocket?
WebSocket 是一种建立在 TCP 连接基础上的全双工通信协议,具有以下核心特点:
- 基于 TCP:在传输层上提供可靠连接
- 全双工通信:客户端和服务端可以同时进行双向数据传输
- 持久连接:一次握手,持久连接,避免 HTTP 的重复连接开销
- 低延迟:适用于实时通信场景
1.2 全双工通信
全双工通信指通信双方可以同时、双向发送和接收数据,类比于电话通话,区别于对讲机式的半双工通信。
1.3 应用场景
- 即时通讯(聊天应用)
- 直播弹幕
- 实时数据监控(股票行情、物联网数据)
- 在线游戏
- 协同编辑工具
1.4 相关链接
二、WebSocket 协议详解
2.1 协议握手过程
WebSocket 通过 HTTP 协议升级完成握手:
客户端请求:
GET /chat HTTP/1.1
Host: server.example.com
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Key: dGhlIHNhbXBsZSBub25jZQ==
Sec-WebSocket-Version: 13
服务端响应:
HTTP/1.1 101 Switching Protocols
Upgrade: websocket
Connection: Upgrade
Sec-WebSocket-Accept: s3pPLMBiTxaQ9kYGzzhZRbK+xOo=
Sec-WebSocket-Accept 计算:
Sec-WebSocket-Accept = base64(sha1(Sec-WebSocket-Key + "258EAFA5-E914-47DA-95CA-C5AB0DC85B11"))
2.2 数据帧结构
0 1 2 3
0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1 2 3 4 5 6 7 8 9 0 1
+-+-+-+-+-------+-+-------------+-------------------------------+
|F|R|R|R| opcode|M| Payload len | Extended payload length |
|I|S|S|S| (4) |A| (7) | (16/64) |
|N|V|V|V| |S| | (if payload len==126/127) |
| |1|2|3| |K| | |
+-+-+-+-+-------+-+-------------+ - - - - - - - - - - - - - - - +
| Extended payload length continued, if payload len == 127 |
+ - - - - - - - - - - - - - - - +-------------------------------+
| |Masking-key, if MASK set to 1 |
+-------------------------------+-------------------------------+
| Masking-key (continued) | Payload Data |
+-------------------------------- - - - - - - - - - - - - - - - +
: Payload Data continued ... :
+ - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - +
| Payload Data continued ... |
+---------------------------------------------------------------+
各字段说明:
FIN (1位)
1:当前帧是消息的最后一帧0:消息还有后续帧
RSV1, RSV2, RSV3 (各1位)
- 保留位,用于协议扩展
- 必须为0,除非双方协商了扩展
Opcode (4位)
控制帧:
0x8:连接关闭0x9:Ping 帧(心跳检测)0xA:Pong 帧(心跳响应)
数据帧:
0x0:连续帧(分片)0x1:文本帧0x2:二进制帧0x3-0x7:保留(非控制帧)0xB-0xF:保留(控制帧)
MASK (1位)
1:Payload Data 经过掩码处理(客户端→服务端)0:未经过掩码处理(服务端→客户端)
Payload Length (7/7+16/7+64位)
0-125:实际数据长度126:后续2字节表示长度127:后续8字节表示长度
Masking-Key (0或4字节)
- MASK=1 时存在,用于数据解密
Payload Data
- 实际传输的数据
2.3 掩码处理算法
void mask(byte[] original, byte[] maskKey) {
for (int i = 0; i < original.length; i++) {
original[i] = (byte) (original[i] ^ maskKey[i % 4]);
}
}
2.4 消息分片
长消息需要分片传输:
- 第一帧:FIN=0, Opcode≠0
- 中间帧:FIN=0, Opcode=0
- 最后一帧:FIN=1, Opcode=0
三、Android 客户端实现
3.1 使用 OkHttp(推荐)
添加依赖
// 最新版本依赖(2024年推荐)
dependencies {
implementation 'com.squareup.okhttp3:okhttp:4.12.0'
implementation 'com.squareup.okhttp3:mockwebserver:4.12.0' // 测试用
}
注意:OkHttp 从 3.5.0 开始将 WebSocket 功能集成到核心库,无需额外依赖
基本使用示例
// Kotlin 版本(推荐)
class WebSocketManager {
private var webSocket: WebSocket? = null
private val client = OkHttpClient.Builder()
.pingInterval(30, TimeUnit.SECONDS) // 设置心跳间隔
.build()
fun connect(url: String) {
val request = Request.Builder()
.url(url)
.build()
val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
[email protected] = webSocket
Log.d("WebSocket", "连接已建立")
// 发送初始消息
webSocket.send("Hello Server")
}
override fun onMessage(webSocket: WebSocket, text: String) {
Log.d("WebSocket", "收到文本消息: $text")
// 处理文本消息
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
Log.d("WebSocket", "收到二进制消息: ${bytes.size()} bytes")
// 处理二进制消息(如图片、音频)
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
Log.d("WebSocket", "连接关闭中: $code - $reason")
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
Log.d("WebSocket", "连接已关闭: $code - $reason")
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
Log.e("WebSocket", "连接失败", t)
// 实现重连逻辑
}
}
client.newWebSocket(request, listener)
}
fun sendMessage(message: String) {
webSocket?.send(message)
}
fun sendBinary(data: ByteArray) {
webSocket?.send(ByteString.of(*data))
}
fun close() {
webSocket?.close(1000, "正常关闭")
}
}
重连机制实现
class ReconnectingWebSocket(
private val url: String,
private val client: OkHttpClient
) {
private var webSocket: WebSocket? = null
private var isConnected = false
private var reconnectAttempts = 0
private val maxReconnectAttempts = 5
private val reconnectDelay = 2000L
fun connect() {
val request = Request.Builder().url(url).build()
val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
isConnected = true
reconnectAttempts = 0
Log.d("WebSocket", "连接成功")
}
override fun onFailure(webSocket: WebSocket, t: Throwable, response: Response?) {
isConnected = false
webSocket.cancel()
if (reconnectAttempts < maxReconnectAttempts) {
reconnectAttempts++
val delay = reconnectDelay * reconnectAttempts
Handler(Looper.getMainLooper()).postDelayed({
Log.d("WebSocket", "第${reconnectAttempts}次重连...")
connect()
}, delay)
}
}
override fun onClosed(webSocket: WebSocket, code: Int, reason: String) {
isConnected = false
}
}
webSocket = client.newWebSocket(request, listener)
}
}
3.2 使用 Java-WebSocket 库
添加依赖
dependencies {
implementation 'org.java-websocket:Java-WebSocket:1.5.4'
}
基本使用
public class MyWebSocketClient extends WebSocketClient {
public MyWebSocketClient(URI serverUri) {
super(serverUri);
}
@Override
public void onOpen(ServerHandshake handshakedata) {
Log.d("WebSocket", "连接已打开");
send("Hello Server");
}
@Override
public void onMessage(String message) {
Log.d("WebSocket", "收到消息: " + message);
}
@Override
public void onMessage(ByteBuffer bytes) {
Log.d("WebSocket", "收到二进制消息");
}
@Override
public void onClose(int code, String reason, boolean remote) {
Log.d("WebSocket", "连接关闭: " + reason);
}
@Override
public void onError(Exception ex) {
Log.e("WebSocket", "错误", ex);
}
}
// 使用示例
try {
MyWebSocketClient client = new MyWebSocketClient(
new URI("ws://server.address.com:8080")
);
client.connect();
} catch (URISyntaxException e) {
e.printStackTrace();
}
四、服务端模拟与测试
4.1 使用 MockWebServer 测试
class WebSocketTest {
private val mockWebServer = MockWebServer()
private val client = OkHttpClient.Builder().build()
@Test
fun testWebSocketCommunication() {
// 设置 Mock 服务端
mockWebServer.enqueue(
MockResponse()
.withWebSocketUpgrade(object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
// 服务端收到连接
webSocket.send("Welcome!")
}
override fun onMessage(webSocket: WebSocket, text: String) {
// 回显客户端消息
webSocket.send("Echo: $text")
}
})
)
// 启动服务端
mockWebServer.start()
val url = "ws://${mockWebServer.hostName}:${mockWebServer.port}/"
// 创建客户端连接
val request = Request.Builder().url(url).build()
val latch = CountDownLatch(1)
val receivedMessages = mutableListOf<String>()
val listener = object : WebSocketListener() {
override fun onOpen(webSocket: WebSocket, response: Response) {
webSocket.send("Hello")
}
override fun onMessage(webSocket: WebSocket, text: String) {
receivedMessages.add(text)
if (receivedMessages.size >= 2) {
latch.countDown()
}
}
}
client.newWebSocket(request, listener)
// 等待消息接收完成
latch.await(5, TimeUnit.SECONDS)
// 验证结果
assert(receivedMessages.contains("Welcome!"))
assert(receivedMessages.contains("Echo: Hello"))
// 清理
mockWebServer.shutdown()
}
}
4.2 完整测试流程示例
测试场景:完整的握手、消息交换、心跳、关闭流程
@Test
fun testCompleteWebSocketFlow() {
val mockWebServer = MockWebServer()
val executor = Executors.newSingleThreadExecutor()
mockWebServer.enqueue(
MockResponse().withWebSocketUpgrade(object : WebSocketListener() {
private lateinit var webSocket: WebSocket
override fun onOpen(webSocket: WebSocket, response: Response) {
this.webSocket = webSocket
Log.d("Test", "Server: 连接已建立")
}
override fun onMessage(webSocket: WebSocket, text: String) {
Log.d("Test", "Server: 收到消息: $text")
when (text) {
"command 1" -> {
executor.execute {
webSocket.send("replay command 1")
}
}
"command 2" -> {
executor.execute {
webSocket.send("ping")
}
}
}
}
override fun onMessage(webSocket: WebSocket, bytes: ByteString) {
// 处理二进制消息
}
override fun onClosing(webSocket: WebSocket, code: Int, reason: String) {
webSocket.close(code, reason)
}
})
)
mockWebServer.start()
// ... 客户端测试代码
}
五、OkHttp WebSocket 源码解析
5.1 核心接口
WebSocket 接口
public interface WebSocket {
// 发送文本消息
boolean send(String text);
// 发送二进制消息
boolean send(ByteString bytes);
// 关闭连接
boolean close(int code, String reason);
// 取消连接
void cancel();
}
WebSocketListener 接口
public abstract class WebSocketListener {
// 连接建立
public void onOpen(WebSocket webSocket, Response response) {}
// 收到文本消息
public void onMessage(WebSocket webSocket, String text) {}
// 收到二进制消息
public void onMessage(WebSocket webSocket, ByteString bytes) {}
// 连接关闭
public void onClosing(WebSocket webSocket, int code, String reason) {}
// 连接已关闭
public void onClosed(WebSocket webSocket, int code, String reason) {}
// 连接失败
public void onFailure(WebSocket webSocket, Throwable t, Response response) {}
}
5.2 连接建立过程
// RealWebSocket.kt 中的关键代码
internal fun connect(client: OkHttpClient, request: Request) {
// 1. 构造升级请求
val webSocketRequest = request.newBuilder()
.header("Upgrade", "websocket")
.header("Connection", "Upgrade")
.header("Sec-WebSocket-Key", key)
.header("Sec-WebSocket-Version", "13")
.build()
// 2. 发送 HTTP 请求
val call = client.newCall(webSocketRequest)
call.enqueue(object : Callback {
override fun onResponse(call: Call, response: Response) {
// 3. 验证响应
if (response.code != 101) {
// 协议升级失败
failWebSocket(response, ProtocolException(...))
return
}
// 4. 验证 Sec-WebSocket-Accept
val acceptHeader = response.header("Sec-WebSocket-Accept")
val expectedAccept = ...
if (acceptHeader != expectedAccept) {
failWebSocket(response, ProtocolException(...))
return
}
// 5. 创建 WebSocket 实例并开始读取
initReaderAndWriter(response)
}
})
}
5.3 消息读取循环
internal fun readLoop() {
try {
while (!closed) {
// 读取一帧数据
val frame = readFrame()
when (frame.opcode) {
OPCODE_TEXT -> listener.onMessage(this, frame.data.utf8())
OPCODE_BINARY -> listener.onMessage(this, frame.data)
OPCODE_CLOSE -> processCloseFrame(frame)
OPCODE_PING -> processPingFrame(frame)
OPCODE_PONG -> processPongFrame(frame)
}
}
} catch (e: Exception) {
// 处理异常
}
}
六、最佳实践与注意事项
6.1 线程安全
- WebSocket 回调可能在非主线程执行
- UI 更新需要切换到主线程
- 使用
runOnUiThread或Handler
webSocketListener = object : WebSocketListener() {
override fun onMessage(webSocket: WebSocket, text: String) {
runOnUiThread {
// 更新 UI
textView.text = text
}
}
}
6.2 心跳机制
val client = OkHttpClient.Builder()
.pingInterval(30, TimeUnit.SECONDS) // 自动心跳
.build()
6.3 连接状态管理
enum class ConnectionState {
DISCONNECTED,
CONNECTING,
CONNECTED,
RECONNECTING,
DISCONNECTING
}
class WebSocketManager {
private val _connectionState = MutableStateFlow(ConnectionState.DISCONNECTED)
val connectionState: StateFlow<ConnectionState> = _connectionState
fun connect() {
_connectionState.value = ConnectionState.CONNECTING
// ... 连接逻辑
}
}
6.4 错误处理与重连
private suspend fun connectWithRetry(
maxRetries: Int = 5,
initialDelay: Long = 1000
) {
var retryCount = 0
var delay = initialDelay
while (retryCount < maxRetries && !isConnected) {
try {
connect()
break
} catch (e: Exception) {
retryCount++
delay *= 2 // 指数退避
delay(delay)
}
}
}
6.5 消息队列管理
class MessageQueueManager {
private val messageQueue = LinkedBlockingQueue<String>()
private var isProcessing = false
fun sendMessage(message: String) {
messageQueue.offer(message)
processQueue()
}
private fun processQueue() {
if (isProcessing) return
GlobalScope.launch {
isProcessing = true
while (!messageQueue.isEmpty()) {
val message = messageQueue.poll()
webSocket?.send(message)
delay(100) // 控制发送频率
}
isProcessing = false
}
}
}
七、常见问题与解决方案
7.1 SSL/TLS 证书问题
val client = OkHttpClient.Builder()
.sslSocketFactory(
SSLContext.getInstance("TLS").apply {
init(null, null, null)
}.socketFactory,
TrustAllCerts() // 自定义信任管理器(仅测试环境使用)
)
.hostnameVerifier { _, _ -> true } // 仅测试环境使用
.build()
7.2 后台连接保持
// 使用前台服务保持连接
class WebSocketService : Service() {
private val notificationId = 1
override fun onStartCommand(intent: Intent?, flags: Int, startId: Int): Int {
startForeground(notificationId, createNotification())
// 建立 WebSocket 连接
return START_STICKY
}
private fun createNotification(): Notification {
return NotificationCompat.Builder(this, "websocket_channel")
.setContentTitle("WebSocket 服务运行中")
.setSmallIcon(R.drawable.ic_notification)
.build()
}
}
7.3 网络状态监听
class NetworkStateMonitor(context: Context) {
private val connectivityManager = context.getSystemService(
Context.CONNECTIVITY_SERVICE
) as ConnectivityManager
fun observeNetworkState(): Flow<Boolean> = callbackFlow {
val callback = object : ConnectivityManager.NetworkCallback() {
override fun onAvailable(network: Network) {
trySend(true)
}
override fun onLost(network: Network) {
trySend(false)
}
}
val request = NetworkRequest.Builder()
.addCapability(NetworkCapabilities.NET_CAPABILITY_INTERNET)
.build()
connectivityManager.registerNetworkCallback(request, callback)
awaitClose {
connectivityManager.unregisterNetworkCallback(callback)
}
}
}
八、性能优化建议
8.1 消息压缩
fun compressMessage(message: String): ByteArray {
val outputStream = ByteArrayOutputStream()
GZIPOutputStream(outputStream).use {
it.write(message.toByteArray(Charsets.UTF_8))
}
return outputStream.toByteArray()
}
fun decompressMessage(compressed: ByteArray): String {
ByteArrayInputStream(compressed).use { input ->
GZIPInputStream(input).use { gzip ->
return gzip.readBytes().toString(Charsets.UTF_8)
}
}
}
8.2 连接池管理
object WebSocketPool {
private val connections = mutableMapOf<String, WebSocket>()
private val lock = ReentrantLock()
fun getConnection(url: String): WebSocket? {
lock.withLock {
return connections[url]
}
}
fun addConnection(url: String, webSocket: WebSocket) {
lock.withLock {
connections[url] = webSocket
}
}
fun removeConnection(url: String) {
lock.withLock {
connections.remove(url)
}
}
}
九、总结
WebSocket 在 Android 开发中为实时通信提供了强大的支持。通过合理使用 OkHttp 或 Java-WebSocket 库,结合良好的架构设计,可以构建出稳定、高效的实时通信功能。
关键要点:
- 理解协议:掌握 WebSocket 协议细节有助于问题排查
- 选择合适库:OkHttp 是官方推荐,功能全面;Java-WebSocket 更轻量
- 处理连接状态:实现完整的连接状态管理和重连机制
- 注意线程安全:WebSocket 回调在后台线程,UI 更新需切回主线程
- 性能优化:合理使用心跳、消息队列、连接池等技术
Android 中使用 WebSocket 全面指南
https://blog.uso6.com/archives/android-zhong-shi-yong-websocket-quan-mian-zhi-nan
评论