WebChannel独立示例#
在浏览器中运行的远程客户端与服务器之间简单的聊天。
# Copyright (C) 2016 Klarälvdalens Datakonsult AB, a KDAB Group company, [email protected], author Milian Wolff <[email protected]>
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
import os
import sys
from PySide6.QtWidgets import QApplication
from PySide6.QtGui import QDesktopServices
from PySide6.QtNetwork import QHostAddress, QSslSocket
from PySide6.QtCore import (QFile, QFileInfo, QUrl)
from PySide6.QtWebChannel import QWebChannel
from PySide6.QtWebSockets import QWebSocketServer
from dialog import Dialog
from core import Core
from websocketclientwrapper import WebSocketClientWrapper
if __name__ == '__main__':
app = QApplication(sys.argv)
if not QSslSocket.supportsSsl():
print('The example requires SSL support.')
sys.exit(-1)
cur_dir = os.path.dirname(os.path.abspath(__file__))
js_file_info = QFileInfo(f"{cur_dir}/qwebchannel.js")
if not js_file_info.exists():
QFile.copy(":/qtwebchannel/qwebchannel.js",
js_file_info.absoluteFilePath())
# setup the QWebSocketServer
server = QWebSocketServer("QWebChannel Standalone Example Server",
QWebSocketServer.NonSecureMode)
if not server.listen(QHostAddress.LocalHost, 12345):
print("Failed to open web socket server.")
sys.exit(-1)
# wrap WebSocket clients in QWebChannelAbstractTransport objects
client_wrapper = WebSocketClientWrapper(server)
# setup the channel
channel = QWebChannel()
client_wrapper.client_connected.connect(channel.connectTo)
# setup the UI
dialog = Dialog()
# setup the core and publish it to the QWebChannel
core = Core(dialog)
channel.registerObject("core", core)
# open a browser window with the client HTML page
url = QUrl.fromLocalFile(f"{cur_dir}/index.html")
QDesktopServices.openUrl(url)
display_url = url.toDisplayString()
message = f"Initialization complete, opening browser at {display_url}."
dialog.display_message(message)
dialog.show()
sys.exit(app.exec())
# Copyright (C) 2017 Klarälvdalens Datakonsult AB, a KDAB Group company, [email protected], author Milian Wolff <[email protected]>
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from PySide6.QtCore import QObject, Signal, Slot
class Core(QObject):
"""An instance of this class gets published over the WebChannel and is then
accessible to HTML clients."""
sendText = Signal(str)
def __init__(self, dialog, parent=None):
super().__init__(parent)
self._dialog = dialog
self._dialog.send_text.connect(self._emit_send_text)
@Slot(str)
def _emit_send_text(self, text):
self.sendText.emit(text)
@Slot(str)
def receiveText(self, text):
self._dialog.display_message(f"Received message: {text}")
# Copyright (C) 2017 Klarälvdalens Datakonsult AB, a KDAB Group company, [email protected], author Milian Wolff <[email protected]>
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from PySide6.QtCore import Signal, Slot
from PySide6.QtWidgets import QDialog
from ui_dialog import Ui_Dialog
class Dialog(QDialog):
send_text = Signal(str)
def __init__(self, parent=None):
super().__init__(parent)
self._ui = Ui_Dialog()
self._ui.setupUi(self)
self._ui.send.clicked.connect(self.clicked)
self._ui.input.returnPressed.connect(self._ui.send.animateClick)
@Slot(str)
def display_message(self, message):
self._ui.output.appendPlainText(message)
@Slot()
def clicked(self):
text = self._ui.input.text()
if not text:
return
self.send_text.emit(text)
self.display_message(f"Sent message: {text}")
self._ui.input.clear()
# Copyright (C) 2017 Klarälvdalens Datakonsult AB, a KDAB Group company, [email protected], author Milian Wolff <[email protected]>
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from PySide6.QtCore import QObject, Signal, Slot
from websockettransport import WebSocketTransport
class WebSocketClientWrapper(QObject):
"""Wraps connected QWebSockets clients in WebSocketTransport objects.
This code is all that is required to connect incoming WebSockets to
the WebChannel. Any kind of remote JavaScript client that supports
WebSockets can thus receive messages and access the published objects.
"""
client_connected = Signal(WebSocketTransport)
def __init__(self, server, parent=None):
"""Construct the client wrapper with the given parent. All clients
connecting to the QWebSocketServer will be automatically wrapped
in WebSocketTransport objects."""
super().__init__(parent)
self._server = server
self._server.newConnection.connect(self.handle_new_connection)
self._transports = []
@Slot()
def handle_new_connection(self):
"""Wrap an incoming WebSocket connection in a WebSocketTransport
object."""
socket = self._server.nextPendingConnection()
transport = WebSocketTransport(socket)
self._transports.append(transport)
self.client_connected.emit(transport)
# Copyright (C) 2017 Klarälvdalens Datakonsult AB, a KDAB Group company, [email protected], author Milian Wolff <[email protected]>
# Copyright (C) 2022 The Qt Company Ltd.
# SPDX-License-Identifier: LicenseRef-Qt-Commercial OR BSD-3-Clause
from PySide6.QtWebChannel import QWebChannelAbstractTransport
from PySide6.QtCore import QByteArray, QJsonDocument, Slot
class WebSocketTransport(QWebChannelAbstractTransport):
"""QWebChannelAbstractSocket implementation using a QWebSocket internally.
The transport delegates all messages received over the QWebSocket over
its textMessageReceived signal. Analogously, all calls to
sendTextMessage will be sent over the QWebSocket to the remote client.
"""
def __init__(self, socket):
"""Construct the transport object and wrap the given socket.
The socket is also set as the parent of the transport object."""
super().__init__(socket)
self._socket = socket
self._socket.textMessageReceived.connect(self.text_message_received)
self._socket.disconnected.connect(self._disconnected)
def __del__(self):
"""Destroys the WebSocketTransport."""
self._socket.deleteLater()
def _disconnected(self):
self.deleteLater()
def sendMessage(self, message):
"""Serialize the JSON message and send it as a text message via the
WebSocket to the client."""
doc = QJsonDocument(message)
json_message = str(doc.toJson(QJsonDocument.Compact), "utf-8")
self._socket.sendTextMessage(json_message)
@Slot(str)
def text_message_received(self, message_data_in):
"""Deserialize the stringified JSON messageData and emit
messageReceived."""
message_data = QByteArray(bytes(message_data_in, encoding='utf8'))
message = QJsonDocument.fromJson(message_data)
if message.isNull():
print("Failed to parse text message as JSON object:", message_data)
return
if not message.isObject():
print("Received JSON message that is not an object: ", message_data)
return
self.messageReceived.emit(message.object(), self)
<?xml version="1.0" encoding="UTF-8"?>
<ui version="4.0">
<class>Dialog</class>
<widget class="QDialog" name="Dialog">
<property name="geometry">
<rect>
<x>0</x>
<y>0</y>
<width>400</width>
<height>300</height>
</rect>
</property>
<property name="windowTitle">
<string>Dialog</string>
</property>
<layout class="QGridLayout" name="gridLayout">
<item row="1" column="0">
<widget class="QLineEdit" name="input">
<property name="placeholderText">
<string>Message Contents</string>
</property>
</widget>
</item>
<item row="1" column="1">
<widget class="QPushButton" name="send">
<property name="text">
<string>Send</string>
</property>
</widget>
</item>
<item row="0" column="0" colspan="2">
<widget class="QPlainTextEdit" name="output">
<property name="readOnly">
<bool>true</bool>
</property>
<property name="plainText">
<string notr="true">Initializing WebChannel...</string>
</property>
<property name="backgroundVisible">
<bool>false</bool>
</property>
</widget>
</item>
</layout>
</widget>
<resources/>
<connections/>
</ui>
<!DOCTYPE html>
<html>
<head>
<meta http-equiv="Content-Type" content="text/html; charset=utf-8" />
<script type="text/javascript" src="./qwebchannel.js"></script>
<script type="text/javascript">
//BEGIN SETUP
function output(message) {
var output = document.getElementById("output");
output.innerHTML = output.innerHTML + message + "\n";
}
window.onload = function() {
if (location.search != "")
var baseUrl = (/[?&]webChannelBaseUrl=([A-Za-z0-9\-:/\.]+)/.exec(location.search)[1]);
else
var baseUrl = "ws://127.0.0.1:12345";
output("Connecting to WebSocket server at " + baseUrl + ".");
var socket = new WebSocket(baseUrl);
socket.onclose = function() {
console.error("web channel closed");
};
socket.onerror = function(error) {
console.error("web channel error: " + error);
};
socket.onopen = function() {
output("WebSocket connected, setting up QWebChannel.");
new QWebChannel(socket, function(channel) {
// make core object accessible globally
window.core = channel.objects.core;
document.getElementById("send").onclick = function() {
var input = document.getElementById("input");
var text = input.value;
if (!text) {
return;
}
output("Sent message: " + text);
input.value = "";
core.receiveText(text);
}
core.sendText.connect(function(message) {
output("Received message: " + message);
});
core.receiveText("Client connected, ready to send/receive messages!");
output("Connected to WebChannel, ready to send/receive messages!");
});
}
}
//END SETUP
</script>
<style type="text/css">
html {
height: 100%;
width: 100%;
}
#input {
width: 400px;
margin: 0 10px 0 0;
}
#send {
width: 90px;
margin: 0;
}
#output {
width: 500px;
height: 300px;
}
</style>
</head>
<body>
<textarea id="output"></textarea><br />
<input id="input" /><input type="submit" id="send" value="Send" onclick="javascript:click();" />
</body>
</html>