目錄
- 實現原理
- 第一步 安裝環境依賴
- 第二步 配置Celery
- 第三步 編寫機器人聊天主頁面
- 第四步 編寫后臺websocket路由及處理方法
- 第五步 編寫Celery異步任務
- 第六步 運行看效果
- 小結
演示效果如下所示:

實現原理
用戶在聊天界面調用Celery異步任務,Celery異步任務執行完畢后發送結果給channels,然后channels通過websocket將結果實時推送給用戶。對于簡單的算術運算,Celery一般自行計算就好了。對于網上查找詩人簡介這樣的任務,Celery會調用Python爬蟲(requests+parsel)爬取古詩文網站上的詩人簡介,把爬取結果實時返回給用戶。
接下來我們來看下具體的代碼實現吧。
第一步 安裝環境依賴
首先在虛擬環境中安裝django和以下主要項目依賴。本項目使用了最新版本,為3.X版本。
# 主要項目依賴
pip install django
pip install channels
pip install channels_redis
pip install celery
pip install redis
pip install eventlet # windows only
# 爬蟲依賴
pip install requests
pip install parsel
新建一個名為myproject的項目,新建一個app名為bots。如果windows下安裝報錯,如何解決自己網上去找吧,很容易解決。修改settings.py, 將channels和chat加入到INSTALLED_APPS里,并添加相應配置,如下所示:
INSTALLED_APPS = [
'django.contrib.admin',
'django.contrib.auth',
'django.contrib.contenttypes',
'django.contrib.sessions',
'django.contrib.messages',
'django.contrib.staticfiles',
'channels', # channels應用
'bots', # bots應用
]
# 設置ASGI應用
ASGI_APPLICATION = 'myproject.asgi.application'
# 生產環境中使用redis做后臺,安裝channels_redis
import os
CHANNEL_LAYERS = {
"default": {
"BACKEND": "channels_redis.core.RedisChannelLayer",
"CONFIG": {
"hosts": [os.environ.get('REDIS_URL', 'redis://127.0.0.1:6379/2')],
},
},
}
最后將bots應用的urls.py加入到項目urls.py中去,這和常規Django項目無異。
# myproject/urls.py
from django.conf.urls import include
from django.urls import path
from django.contrib import admin
urlpatterns = [
path('bots/', include('bots.urls')),
path('admin/', admin.site.urls),
]
第二步 配置Celery
pip安裝好Celery和redis后,我們要對其進行配置。分別修改myproject目錄下的__init__.py和celery.py(新建), 添加如下代碼:
# __init__.py
from .celery import app as celery_app
__all__ = ('celery_app',)
# celery.py
import os
from celery import Celery
# 設置環境變量
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'myproject.settings')
# 實例化
app = Celery('myproject')
# namespace='CELERY'作用是允許你在Django配置文件中對Celery進行配置
# 但所有Celery配置項必須以CELERY開頭,防止沖突
app.config_from_object('django.conf:settings', namespace='CELERY')
# 自動從Django的已注冊app中發現任務
app.autodiscover_tasks()
# 一個測試任務
@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')
接著修改settings.py, 增加如下Celery配置:
# Celery配置
CELERY_BROKER_URL = "redis://127.0.0.1:6379/0"
CELERY_TIMEZONE = TIME_ZONE
# celery內容等消息的格式設置,默認json
CELERY_ACCEPT_CONTENT = ['application/json', ]
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
完整Celery配置見:Django進階:萬字長文教你使用Celery執行異步和周期性任務(多圖)
第三步 編寫機器人聊天主頁面
本例我們只需要利用django普通視圖函數編寫1個頁面,用于展示首頁(index)與用戶交互的聊天頁面。這個頁面對應的路由及視圖函數如下所示:
# bots/urls.py
from django.urls import path
from . import views
urlpatterns = [
path('', views.index, name='index'),
]
# bots/views.py
from django.shortcuts import render
def index(request):
return render(request, 'bots/index.html', {})
接下來我們編寫模板文件index.html,它的路徑位置如下所示:
bots/
__init__.py
templates/
bots/
index.html
urls.py
views.py
index.html內容如下所示。
!DOCTYPE html>
html>
head>
meta charset="utf-8"/>
title>Django+Channels+Celery聊天機器人/title>
/head>
body>
textarea id="chat-log" cols="100" rows="20" readonly>/textarea>
br/>
input id="chat-message-input" type="text" size="100"
placeholder="輸入`help`獲取幫助信息."/>br/>input id="chat-message-submit" type="button" value="Send"/>
script>
var wss_protocol = (window.location.protocol == 'https:') ? 'wss://': 'ws://';
var chatSocket = new WebSocket(
wss_protocol + window.location.host + '/ws/bots/'
);
chatSocket.onopen = function(e) {
document.querySelector('#chat-log').value +=
('歡迎來到大江狗Django聊天機器人. 請輸入`help`獲取幫助信息.\n')}
chatSocket.onmessage = function(e) {
var data = JSON.parse(e.data);
var message = data['message'];
document.querySelector('#chat-log').value += (message + '\n');
};
chatSocket.onclose = function(e) {
document.querySelector('#chat-log').value +=
('Socket closed unexpectedly, please reload the page.\n')};
document.querySelector('#chat-message-input').focus();
document.querySelector('#chat-message-input').onkeyup = function(e) {
if (e.keyCode === 13) { // enter, return
document.querySelector('#chat-message-submit').click();
}
};
document.querySelector('#chat-message-submit').onclick = function(e) {
var messageInputDom = document.querySelector('#chat-message-input');
var message = messageInputDom.value;
chatSocket.send(JSON.stringify({
'message': message
}));
messageInputDom.value = '';
};
/script>
/body>
/html>
第四步 編寫后臺websocket路由及處理方法
當 channels 接受 WebSocket 連接時, 它也會根據根路由配置去查找相應的處理方法。只不過channels的websocket路由不在urls.py中配置,處理函數也不寫在views.py。在channels中,這兩個文件分別變成了routing.py和consumers.py。
在bots應用下新建routing.py, 添加如下代碼。它的作用是將發送至ws/bots/的websocket請求轉由BotConsumer處理。
from django.urls import re_path
from . import consumers
websocket_urlpatterns = [
re_path(r'ws/bots/$', consumers.BotConsumer.as_asgi()),
]
注意:定義websocket路由時,推薦使用常見的路徑前綴 (如/ws) 來區分 WebSocket 連接與普通 HTTP 連接, 因為它將使生產環境中部署 Channels 更容易,比如nginx把所有/ws的請求轉給channels處理。
與Django類似,我們還需要把這個app的websocket路由加入到項目的根路由中去。編輯myproject/asgi.py, 添加如下代碼:
# myproject/asgi.py
import os
from channels.auth import AuthMiddlewareStack
from channels.routing import ProtocolTypeRouter, URLRouter
from django.core.asgi import get_asgi_application
import chat.routing
import bots.routing
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "myproject.settings")
application = ProtocolTypeRouter({
"http": get_asgi_application(),
# websocket請求使用的路由
"websocket": AuthMiddlewareStack(
URLRouter(
bots.routing.websocket_urlpatterns
)
)
})
接下來在bots應用下新建consumers.py, 添加如下代碼:
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebsocketConsumer
from . import tasks
COMMANDS = {
'help': {
'help': '命令幫助信息.',
},
'add': {
'args': 2,
'help': '計算兩個數之和, 例子: `add 12 32`.',
'task': 'add'
},
'search': {
'args': 1,
'help': '通過名字查找詩人介紹,例子: `search 李白`.',
'task': 'search'
},
}
class BotConsumer(WebsocketConsumer):
def receive(self, text_data):
text_data_json = json.loads(text_data)
message = text_data_json['message']
response_message = '請輸入`help`獲取命令幫助信息。'
message_parts = message.split()
if message_parts:
command = message_parts[0].lower()
if command == 'help':
response_message = '支持的命令有:\n' + '\n'.join(
[f'{command} - {params["help"]} ' for command, params in COMMANDS.items()])
elif command in COMMANDS:
if len(message_parts[1:]) != COMMANDS[command]['args']:
response_message = f'命令`{command}`參數錯誤,請重新輸入.'
else:
getattr(tasks, COMMANDS[command]['task']).delay(self.channel_name, *message_parts[1:])
response_message = f'收到`{message}`任務.'
async_to_sync(self.channel_layer.send)(
self.channel_name,
{
'type': 'chat.message',
'message': response_message
}
)
def chat_message(self, event):
message = event['message']
# Send message to WebSocket
self.send(text_data=json.dumps({
'message': f'[機器人]: {message}'
}))
上面代碼中最重要的一行如下所示。BotConsumer在接收到路由轉發的前端消息后,對其解析,將當前頻道名和解析后的參數一起交由Celery異步執行。Celery執行任務完成以后會將結果發到這個頻道,這樣就實現了channels和Celery的通信。
getattr(tasks, COMMANDS[command]['task']).delay(self.channel_name, *message_parts[1:])
第五步 編寫Celery異步任務
在bots目錄下新建`tasks.py`,添加如下代碼:
from asgiref.sync import async_to_sync
from celery import shared_task
from channels.layers import get_channel_layer
from parsel import Selector
import requests
channel_layer = get_channel_layer()
@shared_task
def add(channel_name, x, y):
message = '{}+{}={}'.format(x, y, int(x) + int(y))
async_to_sync(channel_layer.send)(channel_name, {"type": "chat.message", "message": message})
print(message)
@shared_task
def search(channel_name, name):
spider = PoemSpider(name)
result = spider.parse_page()
async_to_sync(channel_layer.send)(channel_name, {"type": "chat.message", "message": str(result)})
print(result)
class PoemSpider(object):
def __init__(self, keyword):
self.keyword = keyword
self.url = "https://so.gushiwen.cn/search.aspx"
def parse_page(self):
params = {'value': self.keyword}
response = requests.get(self.url, params=params)
if response.status_code == 200:
# 創建Selector類實例
selector = Selector(response.text)
# 采用xpath選擇器提取詩人介紹
intro = selector.xpath('//textarea[starts-with(@id,"txtareAuthor")]/text()').get()
print("{}介紹:{}".format(self.keyword, intro))
if intro:
return intro
print("請求失敗 status:{}".format(response.status_code))
return "未找到詩人介紹。"
以上兩個任務都以channel_name為參數,任務執行完畢后通過channel_layer的send方法將結果發送到指定頻道。
注意:
- 默認獲取channel_layer的方式是調用接口:channels.layers.get_channel_layer()。如果是在consumer中調用接口的話可以直接使用self.channel_layer。
- 對于channel layer的方法(包括send()、group_send(),group_add()等)都屬于異步方法,這意味著在調用的時候都需要使用await,而如果想要在同步代碼中使用它們,就需要使用裝飾器asgiref.sync.async_to_sync
第六步 運行看效果
如果不出意外,你現在的項目布局應該如下所示。說實話,整個項目一共沒幾個文件,Python的簡潔和效率真是出了名的好啊。

連續運行如下命令,就可以看到我們文初的效果啦。
# 啟動django測試服務器
python manage.py makemigrations
python manage.py migrate
python manage.py runserver
# windows下啟動Celery需eventlet
# 啟動Celery前確定redis服務已開啟哦
Celery -A myproject worker -l info -P eventlet
小結
本文我們使用Django + Channels + Celery + Redis打造了一個聊天機器人,既會算算術,還會查古詩文。借用這個實現原理,你可以打造非常有趣的實時聊天應用哦,比如在線即時問答,在線客服,實時查詢訂單,Django版的siri美女等等。
Django Channels + Websocket + Celery聊天機器人項目源碼地址:https://github.com/shiyunbo/django-channels-chatbot
以上就是Django實現聊天機器人的詳細內容,更多關于Django 聊天機器人的資料請關注腳本之家其它相關文章!
您可能感興趣的文章:- Python實戰整活之聊天機器人
- Python如何實現機器人聊天
- vue.js實現h5機器人聊天(測試版)
- python操作微信自動發消息的實現(微信聊天機器人)
- Python使用20行代碼實現微信聊天機器人
- jquery實現聊天機器人
- 基于python的itchat庫實現微信聊天機器人(推薦)
- nodejs實現聊天機器人功能
- Python QQBot庫的QQ聊天機器人
- 使用python接入微信聊天機器人
- python微信聊天機器人改進版(定時或觸發抓取天氣預報、勵志語錄等,向好友推送)