用Haskell实现一个聊天频道
我一定是极其无聊,所以才写了两百多行的恶心代码。然而其中还是有不少心血的,故记录下来以供以后参考。
先介绍下功能,使用websocket
作为通道,warp
为其容器,客户端使用浏览器。通过url
可以直接进入频道,频道内有最近的聊天记录,在线人列表。
Haskell
实现websocket
服务端 ,遇到几个关键问题:
- 保持socket的活跃,
websockets
包带有withPingThread
函数可以保障活跃。 - 线程的生命周期管理,比如关闭浏览器了,心跳线程,监听消息线程当然也要关闭。
- 线程间的通信,如A向B发送了一条消息以及维护最近消息和在线人数列表,它们都是在不同的线程里面。
线程生命周期管理可以用下面几个函数:
async
异步开启一个线程cancel
取消async
开启的线程waitEitherCancel
等待两个async
,取先返回的作为返回值并同时取消另一个正在运行的线程。
线程间通信可以使用Chan
作为广播
newChan
创建dupChan
最关键的一个函数,复制Chan
通道,新的Chan
写入可被所有的Chan
读取。readChan
从Chan
中读取值writeChan
写入到Chan
中
代码及实现思路:
开启服务与基础线程:
httpServe = do
chatChannel <- newChan
chatAsync <- async $ EM.chatHeadquarters chatChannel
r <- try $ W.runSettings setting $ W.toApplication $ webapp (channel,chatChannel)
case r of
(Left e) -> $err' $ "httpServe" ++ displayException (e :: SomeException)
(Right _) -> pure ()
-- 终止线程
cancel chatAsync
chatHeadquarters
为维护最近消息列表、频道列表、在线人列表线程
W.runSettings
为warp
提供的运行 http
及websockets
的容器 ,当其出现灾难级别的错误意味着整个服务都挂了,chatHeadquarters
线程就没有意义了,故 cancel chatAsync
当用户建立连接:
channelApp = do
channelHead <- getInnerState
channel <- liftIO $ dupChan channelHead
chatName <- W.consumV
W.respSocket' $ \ pendingConn -> do
conn <- W.acceptRequest pendingConn
clientName <- W.receiveData conn
let clientId = (chatName,clientName)
--判断名字是否合法
writeChan channel $ Chat.Online clientId
a <- async $ waitInitialInfo clientId channel
-- 10s超时
b <- async $ threadDelay (oneSec * 10)
r <- waitEitherCancel a b
case r of
Right _ -> W.sendClose conn ("Timeout"::Text)
Left (Left _) -> W.sendClose conn ("Illege name"::Text)
Left (Right info) -> do
W.sendTextData conn $ A.encode (Chat.CInitialInfo info)
let heart = pingSysem channel clientId
-- Ping/30s,浏览器Websocket会自动回应Pong消息,客户端无需code
W.withPingThread conn 30 heart $ do
let controlException (W.CloseRequest code reson) = do
$trace' $ show ("CloseRequest",code,reson)
writeChan channel $ Chat.Offline clientId "CloseRequest Exception"
controlException W.ConnectionClosed = do
$trace' "ConnectionClosed"
writeChan channel $ Chat.Offline clientId "ConnectionClosed"
controlException e = do
$err' $ show e
a <- async $ forever $ listenBroadcast channel conn clientId
controlException `handle` (forever $ listenClient channel conn clientId)
cancel a
每一个客户端建立连接都为其创建一个线程内部通信的通道
channel <- liftIO $ dupChan channelHead
名字验证逻辑在前面提到的chatHeadquarters
线程内完成,因为该线程维护着最全的信息可以判断重名与否,验证后的结果通过channel
发送过来。
writeChan channel $ Chat.Online clientId
为广播一个人来建立连接了,但只有chatHeadquarters线程处理这条消息 ,验证名字是否合法并把该频道内的最近消息和在线人员列表广播回来。
a <- async $ waitInitialInfo clientId channel
为新开启一个线程,等待接收chatHeadquarter
线程广播回来的信息。 这两个线程的代码实现会放到最后。
由于等待时间未知,需要一个超时处理,如果超时就要取消等待线程,这里利用waitEitherCancel
巧妙的实现。
a <- async $ waitInitialInfo clientId channel
-- 10s超时
b <- async $ threadDelay (oneSec * 10)
r <- waitEitherCancel a b
接下来就是处理验证结果:
错误就关闭连接并且带着原因。
正确就进入主逻辑:
W.withPingThread conn 30 heart $ do
新开启线程每30s向客户端发送Ping
消息并调用下heart
,其中heart参数是一个 IO ()
,最后一个参数为主逻辑,如果挂掉Ping
线程也会相应挂掉。
a <- async $ forever $ listenBroadcast channel conn clientId
监听其它线程广播过来的消息,并处理——有消息就通过websocket
发送给客户端,同时判断消息是否属于本频道的,若不属于则忽略。
(forever $ listenClient channel conn clientId)
监听客户端发过来的消息,然后通过channel
广播出去。
同时做好异常处理,controlException
如果挂掉友好关闭客户端连接,并终止listenBroadcast
线程。
至此大体思路讲解完毕了,后面给出各个线程 实现,比较无聊可以不用往后面看了。
chatHeadquarter
线程:
chatHeadquarters chatChannel = do
chatInfosRef <- newIORef []
liveCheck <- async $ forever $ do
threadDelay (oneSec * 60)
time <- Kit.getCurrentTime
let crstamp = Kit.mkTimestamp time
chatInfos <- readIORef chatInfosRef
forM_ chatInfos $ \ info -> do
forM_ (Chat.onlineMember info) $ \ (id,timestamp) -> do
if crstamp - timestamp > 40*1000 then do
writeChan chatChannel $ Chat.Offline (Chat.chatName info,id) "Timeout"
else pure ()
let handException action = do
handle' `handle` action
where handle' e = do
cancel liveCheck
$err' $ show (e :: SomeException)
handException $ forever $ do
a <- readChan chatChannel
case a of
(Chat.Online clientId@(chatName,id)) -> do
chatInfos <- readIORef chatInfosRef
r <- modifyOrCreateChat chatName chatInfos $ \ info -> do
time <- Kit.getCurrentTime
let timestamp = Kit.mkTimestamp time
let info' = Chat.over_onlineMember (((id,timestamp):) . filter ((/=id) . fst)) info
writeChan chatChannel $ Chat.InitialInfo clientId info'
pure info'
writeIORef chatInfosRef r
(Chat.Ping (chatName,id)) -> do
chatInfos <- readIORef chatInfosRef
r <- modifyOrCreateChat chatName chatInfos $ \ info -> do
time <- Kit.getCurrentTime
let timestamp = Kit.mkTimestamp time
let member = List.find ((==id) . fst) $ Chat.onlineMember info
if member == Nothing then do
--此处为处于某种原因而踢出在线列表,但连接仍然有效并且在Ping.
--同一个ID多个端登录,其中一端下线也会出现这种情况
let info' = Chat.over_onlineMember ((id,timestamp):) info
writeChan chatChannel $ Chat.Online (chatName,id)
pure info'
else do
let updateTime (id',_) | id' == id = (id,timestamp)
updateTime a = a
let info' = Chat.over_onlineMember (map updateTime) info
pure info'
writeIORef chatInfosRef r
(Chat.Send chatName message) -> do
chatInfos <- readIORef chatInfosRef
r <- modifyOrCreateChat chatName chatInfos $ \ info -> do
pure $ Chat.over_recentMessages (take 100 . (message:)) info
writeIORef chatInfosRef r
(Chat.Offline (chatName,id) reson) -> do
chatInfos <- readIORef chatInfosRef
let ignore (id',_) = id' /= id
r <- modifyOrCreateChat chatName chatInfos $ \ info -> do
pure $ Chat.over_onlineMember (filter ignore) info
writeIORef chatInfosRef r
_ -> pure ()
listenClient
监听客户端消息线程:
listenClient channel conn clientId@(chatName',clientName) = do
text <- W.receiveData conn
let clientMessage = A.decode text
case clientMessage of
(Just (Chat.CMessage msg)) -> do
currentTime <- Kit.getCurrentTime
let timestamp = Kit.mkTimestamp currentTime
let sMsg = Chat.Send chatName'
$ Chat.set_time timestamp
$ Chat.set_sender clientName
$ msg
writeChan channel sMsg
_ -> W.sendTextData conn $ A.encode Chat.CIllegeData
listenBroadcast
监听广播线程:
listenBroadcast channel conn clientId@(chatName',_) = do
message <- readChan channel
case message of
Chat.Offline id@(name,_) text | name == chatName' -> do
W.sendTextData conn $ A.encode $ Chat.COffLine id text
Chat.InitialInfo id@(name,_) _ | name == chatName' -> do
W.sendTextData conn $ A.encode $ Chat.COnline id
Chat.Send chatName chatMessage | chatName == chatName' -> do
W.sendTextData conn $ A.encode $ Chat.CMessage chatMessage
_ -> pure ()
pingSystem
心跳广播线程
-- 内存中以心跳方式维护client在线情况
pingSysem channel clientId = do
writeChan channel $ Chat.Ping clientId
以下为客户端(React)的实现,更无聊不建议看
websocket本地环境代理实现:
import { createProxyMiddleware } from 'http-proxy-middleware';
import { NextApiRequest, NextApiResponse } from 'next';
let proxy= createProxyMiddleware('/api/**', {
target: "http://localhost:8899",
ws: true, // enable proxying WebSockets
pathRewrite: { "^/api": "" }, // remove `/api/proxy` prefix
}) as any;
export default async function handler(req:NextApiRequest, res:NextApiResponse) {
proxy(req, res, (err:any) => {
if (err) {
throw err;
}
throw new Error(`Local proxy received bad request for ${req.url}`);
});
}
chatSockets.ts
:
type Events = {
close: (ws:WebSocket,e:CloseEvent) => void,
error: (ws:WebSocket,e:Event) => void,
message: (ws:WebSocket,e:MessageEvent) => void,
open: (ws:WebSocket,e:Event) => void
}
export function chat(url:string,events:Events){
let ws = new WebSocket(url)
ws.addEventListener("close",e=>events.close(ws,e))
ws.addEventListener("error",e=>events.error(ws,e))
ws.addEventListener("message",e=>events.message(ws,e))
ws.addEventListener("open",e=>events.open(ws,e))
return ws
}
export function cancel(ws:WebSocket){
ws.close(3000,"Page have left")
}
export function doName(){
let myid = localStorage.getItem("myid")
let genName = () => new Array(7)
.fill(0)
.map(()=>Math.round(65+Math.random()*57))
.map(a=>String.fromCharCode(a))
.join("")
if (myid != null) {
return myid
} else {
myid = genName()
localStorage.setItem("myid",myid)
return myid
}
}
export function chatForChannel(chatName:string,onMessage:Events["message"]){
let protocol = location.protocol == "http:" ? "ws:" : "wss:"
return chat(`${protocol}//${location.host}/api/chat/channel/${chatName}`,{
open(ws,e){
ws.send(doName())
},
error(ws,e){
console.error(e)
alert('A error apear from WebSocket')
},
message: onMessage,
close(ws,e){
console.log("WebSocket-close",e)
}
})
}
manpage
:
import { Stack,chakra,useColorMode,Text, Box, IconButton, ChakraProps, Center, useColorModeValue, Tooltip, useToken, useTheme, VStack, BoxProps, TextProps, TextareaProps, useClipboard, useToast, UseModalProps, Button, Input, useDisclosure } from '@chakra-ui/react'
import type { NextPage } from 'next'
import {CopyIcon, MoonIcon,SmallAddIcon,SunIcon} from '@chakra-ui/icons'
import React, { useEffect, useRef, useState } from 'react'
import FlexInputAutomatically from '@c/FlexInputAutomatically'
import Sending from "src/icons/Sending"
import { useRouter } from 'next/router'
import { cancel, chatForChannel, doName } from 'src/channel/chatSocket'
import {
Modal,
ModalOverlay,
ModalContent,
ModalHeader,
ModalFooter,
ModalBody,
ModalCloseButton,
} from '@chakra-ui/react'
type SwitchChannelProps = {
isOpen: boolean,
onClose: UseModalProps["onClose"],
onOk: (a:string) => void
}
let SwitchChannel : React.FC<SwitchChannelProps> = props => {
let [channel,setChannel] = useState({
value: "",
isInvalid: false
})
let onOk : React.MouseEventHandler<HTMLButtonElement> = e => {
if(channel.value == ""){
setChannel(a=>({...a,isInvalid:true}))
}else{
props.onOk(channel.value)
}
}
let onInput : React.FormEventHandler<HTMLInputElement> = e => {
if(e.currentTarget.value == ""){
setChannel(oldV => {
return {...oldV,value:""}
})
}else{
setChannel({value:e.currentTarget.value,isInvalid:false})
}
}
return <Modal isOpen={props.isOpen} onClose={props.onClose}>
<ModalOverlay />
<ModalContent>
<ModalHeader>切换频道</ModalHeader>
<ModalCloseButton />
<ModalBody>
<Input variant='flushed'
onInput={onInput}
isInvalid={channel.isInvalid}
value={channel.value} placeholder='输入频道名称' />
</ModalBody>
<ModalFooter>
<Button colorScheme='blue' mr={3} onClick={props.onClose}>
关闭
</Button>
<Button onClick={onOk} variant='ghost'>确定</Button>
</ModalFooter>
</ModalContent>
</Modal>
}
let BoxOut : React.FC<{children: React.ReactElement[]}> = props => {
return <Box
width={["100vw",null,"xl"]}
margin="auto"
minH="100vh"
border="solid"
borderColor={useColorModeValue("gray.200","gray.500")}
borderWidth="1px"
borderRadius="xl"
paddingTop="84px"
paddingBottom="78px"
overflow="hidden"
borderBottom="none">
{props.children}
</Box>
}
type MessageProps = {
message: any,
type: "me" | "other"
}
let Message : React.FC<MessageProps> = props => {
let me = props.type == "me"
let boxSetting : BoxProps
let toolSetting : TextProps
function formatDate(){
let d = new Date(props.message.time)
return d.toLocaleString()
}
if(me){
boxSetting = {
mr : "2",
bg : useColorModeValue("teal.200","teal.500")
}
toolSetting = {
right: "0"
}
} else {
boxSetting = {
ml : "2",
bg : useColorModeValue("gray.200","gray.500")
}
toolSetting = {
left: "0"
}
}
return <VStack
w="full"
alignItems={me ? "flex-end" : "flex-start"}
>
<Box
{...boxSetting}
borderRadius="xl"
padding="2"
minW="20"
maxW={["350px","500px"]}
position="relative"
>
<Text
position="absolute"
top="-20px"
{...toolSetting}
fontSize="xs" color={useColorModeValue("gray.500","whiteAlpha.500")}>{props.message.sender}</Text>
<Text fontSize="md" whiteSpace="pre-line" title={formatDate()}>{props.message.content}</Text>
</Box>
</VStack>
}
let isMobile = () => /Android|webOS|iPhone|iPad|iPod|BlackBerry|IEMobile|Opera Mini/i.test(navigator.userAgent)
const TextS : NextPage = () => {
let router = useRouter()
let [message,setMessage] = useState("")
let [listUpdatedVersion,setListUpdatedVersion] = useState(0)
let [list,setList] = useState([] as {content:string,sender:string,time:number}[])
let [members,setMembers] = useState([] as [string,number][])
let [me,setMe] = useState("")
let [chatWS,setChatWS] = useState(null as WebSocket | null)
let toast = useToast()
function onMessageFromServer(data:{tag:string,contents:any},ws:WebSocket){
console.log("onMessage",data)
if(data.tag == "CInitialInfo"){
setChatWS(ws)
setList(data.contents.recentMessages.reverse())
setListUpdatedVersion(Date.now())
setMembers(data.contents.onlineMember)
} else if(data.tag=="CIllegeData"){
toast({
status: "warning",
description: "非法输入",
isClosable: true
})
} else if(data.tag == "CMessage"){
setList(list => list.concat([data.contents]).slice(0,1000))
setListUpdatedVersion(Date.now())
} else if(data.tag == "COffLine"){
let a = data.contents[0][1]
// if(a != me) {
setMembers(members => members.filter(([id])=>id != a))
// }
} else if(data.tag == "COnline"){
console.log("COnline-members",members.length)
let a = data.contents[1]
setMembers(members => members.filter(([id])=>id != a).concat([[a,Date.now()]]))
}
}
useEffect(()=>{
if(listUpdatedVersion == 0){
void null
}else{
window.scrollTo(0,document.body.scrollHeight)
}
},[listUpdatedVersion])
let {colorMode,toggleColorMode} = useColorMode()
let ColorModeIcon = colorMode == "light" ? <MoonIcon /> : <SunIcon />
let bg = useColorModeValue("whiteAlpha.900","blackAlpha.200")
let sendAction = () => {
if(message == ""){
void null
} else {
chatWS?.send(JSON.stringify({
tag: "CMessage",
contents:{
content:message,
sender: "=",
time:0,
}
}))
setMessage("")
}
}
let messageInputOnKeyDown : TextareaProps["onKeyDown"] = e => {
if(e.shiftKey == false && e.code == "Enter" && isMobile() == false){
e.preventDefault()
sendAction()
}else{
void null
}
}
let [uri,setUri] = useState(router.pathname)
let { hasCopied, onCopy } = useClipboard(uri)
useEffect(()=>{
setUri(location.href)
},[router.query.id])
useEffect(()=>{
if(router.query.id == null) return
setMe(doName())
let ws = chatForChannel(router.query.id as string,(ws,e)=>{
onMessageFromServer(JSON.parse(e.data),ws)
})
return ()=>{
cancel(ws)
}
},[router.query.id])
useEffect(()=>{
if(hasCopied){
toast({
status: "success",
position: "top",
isClosable: true,
description: "复制成功," + uri
})
}
},[hasCopied])
let { isOpen, onOpen, onClose } = useDisclosure()
let onConfirm: SwitchChannelProps["onOk"] = e => {
router.push("/channel/" + e)
onClose()
}
return (
<BoxOut>
<SwitchChannel isOpen={isOpen} onClose={onClose} onOk={onConfirm} />
<Stack
direction={"row"}
zIndex="docked"
bg={useColorModeValue("gray.200","gray.500")}
position="fixed"
w={["100vw",null,"xl"]}
padding="4"
top="0"
alignItems={"center"}>
<Text fontSize={"md"}>
{router.query.id}
</Text>
<Text fontSize={"xs"} ml="px">
({members.length}人在线)
</Text>
<Stack direction="row" flexGrow={1} justifyContent="flex-end">
<Tooltip label="复制分享">
<IconButton
onClick={onCopy}
aria-label='分享' size={"sm"} icon={<CopyIcon />} />
</Tooltip>
<IconButton
title="更换一个频道"
onClick={onOpen}
aria-label='更换一个频道' size={"sm"} icon={<SmallAddIcon />} />
<IconButton
title="高亮/夜间模式"
aria-label='高亮/夜间模式' size={"sm"} onClick={toggleColorMode} icon={ColorModeIcon} />
</Stack>
</Stack>
<VStack spacing={12}>
{list.map(a=><Message message={a} key={a.sender+a.time} type={a.sender == me ? "me" : "other"} />)}
</VStack>
<Box
position="fixed"
w={["100vw",null,"xl"]}
bottom={0}
zIndex="docked"
bg={useColorModeValue("gray.200","gray.500")}
ml="-1px"
padding="8px 16px"
>
<FlexInputAutomatically
bg={bg}
borderColor={bg}
aria-label="输入信息"
value={message}
autoFocus={true}
placeholder="Enter发送/Shift-Enter换行"
pr="9"
onKeyDown={messageInputOnKeyDown}
onInput={message=>setMessage(message)}
/>
<Sending
position="absolute"
zIndex="docked"
boxSize={6}
onClick={sendAction}
right="7"
cursor="pointer"
top="4"
/>
</Box>
</BoxOut>
)
}
export default TextS