socket系列(三)- 客户端与服务端保持长连接(实战篇)

[复制链接]
查看577 | 回复4 | 2023-9-27 16:46:57 | 显示全部楼层 |阅读模式

背景介绍:项目使用了两台linux设备来控制同一个售货机,但是在后台是希望将两台设备看作一体。所以就少不了两台设备之间的简单通信。

我希望两者的socket能够保持长连接,并且能够断连自恢复。需求就这么简单!接下来看实现:

代码相对来说比较长,所以就以注释的形式进行说明了!

先来看server端实现:

void * socket_server_func(void * args)
{
    int ret = 0;
    char rcv_buff[128] = {0};
    // 这里添加了一个SOCK_NONBLOCK的socket类型,表示这个socket采用非阻塞形式通信
    int server_socket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr("192.168.1.10");
    server_addr.sin_port = htons("9090");
    bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
    listen(server_socket, 20);

    struct sockaddr client_addr;
    socklen_t client_addr_size = sizeof(client_addr);
    /*****************************************************************************/
    /*                            保持长连接的关键代码                              */
    /*****************************************************************************/
    while (1) {
        // server端的第一层循环,用于在客户端断开后,重新建立新的client_socket
        int client_socket = accept(server_socket, (struct sockaddr *)&client_addr, &client_addr_size);
        while (1) {
            // server端的第二层循环,用于发送消息以及对副机的反馈进行处理。兼顾检测客户端心跳
            if (socket_msg_queue->next != NULL) { // socket消息队列不为空的情况下
                socket_msg_rear = read_queue(socket_msg_top, socket_msg_rear);  // 出队
                ret = send(client_socket, out->msgall->socket_msg, sizeof(out->msgall->socket_msg), 0);
                if (ret <= 0) {
                    LOGE("副机发送消息失败:%s \n", out->msgall->socket_msg);
                    socket_msg_rear = write_queue(socket_msg_rear, out->msgall);
                } else {
                    ;
                }
                // 下面是一些项目需要的逻辑实现,就不多做介绍了
                sleep(10);
                memset(rcv_buff, 0, sizeof(rcv_buff));
                ret = recv(client_socket, rcv_buff, 128, 0);
                if (ret > 0) {
                    if (strstr(rcv_buff, "reboot")) {
                        LOGD("副机反馈:reboot\n");
                    } else if (strstr(rcv_buff, "noevent")) {
                        LOGD("副机反馈:noevent\n");
                        slave_event_handling = 0;
                    } else if (strstr(rcv_buff, "handling")) {
                        LOGD("副机反馈:handling\n");
                        slave_event_handling = 1;
                    }
                }

                free(out->msgall->socket_msg);
                free(out->msgall);
            }

            // 检测心跳副机心跳,这里这么写,主要是因为我的副机除了心跳没有其他主动发消息的业务。所以,主机如果在一定时间内未收到来自副机的消息,那它一定是掉线了
            ret = recv(client_socket, rcv_buff, 128, 0);
            if (ret > 0) {
                // 对客户端的心跳做镜像恢复,有助于副机判断是否保持连接
                send(client_socket, rcv_buff, sizeof(rcv_buff), 0);
                LOGD("主机收到副机Socket消息:%s \n", rcv_buff);
                if (strstr(rcv_buff, "heartbeat")) {
                    slave_running = 1;
                } else {
                    // 其实从理论上来讲,这个分支是多余的,但是为了严谨嘛
                    LOGE("副机已离线!\n");
                    sleep(5);
                    break;
                }
            } else {
                // 如果在15s以内没有检测到副机的心跳,判断它离线
                LOGE("副机已离线!\n");
                sleep(5);
                break;  // 退出内层循环
            }
            sleep(15);  // 这里的sleep很关键,需要大于客户端心跳的事件间隙
        }
        close(client_socket);  // 关闭当前套接字,这个很关键。
        sleep(10);
        // 然后重新进入外层循环
    }
    close(server_socket);
}

然后是client端:

/**
 * 与服务端建立连接并保持连接。
 *      1. 连接服务端
 *      2. 读取socket队列判断有无待发送消息
 *      3. 发送消息(2 == true)
 *      4. 接收消息
 *      5. 消息处理(4 == true)
*/
void * socket_client_func(void * args)
{
    char rcv_buff[128];
    int client_socket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
    if (client_socket < 0) {
        LOGE("socket error!\n");
        return NULL;
    }
    struct sockaddr_in server_addr;
    memset(&server_addr, 0, sizeof(server_addr));
    server_addr.sin_family = AF_INET;
    server_addr.sin_addr.s_addr = inet_addr("192.168.1.10");
    server_addr.sin_port = htons("9090");

    // 与server端同样的原理,client端也采用双层循环,外部做重连,内层保持连接。
    while (1) {
        int ret = connect(client_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
        if (ret) {
            LOGE("主副机建立Socket连接失败!\n");
            sleep(5);  // 很有可能测试主机没上线,所以不要立即continue,做一定延时比较好
            continue;
        } 
        LOGI("主副机建立Socket连接成功!\n");
        while (1) {
            // 发布心跳消息,可以自定义心跳消息,使用时间戳可能比较好
            ret = send(client_socket, "heartbeat", sizeof("heartbeat"), 0);
            if (ret > 0) {
                LOGD("副机发布心跳消息:heartbeat\n");
            }
            sleep(5);
            memset(rcv_buff, 0, sizeof(rcv_buff));
            // 获取来自主机的镜像回复
            ret = recv(client_socket, rcv_buff, sizeof(rcv_buff), 0);
            if (ret > 0) {
                if (strstr(rcv_buff, "heartbeat")) {  // 根据镜像回复判断连接状态
                    LOGD("心跳检测成功!\n");
                } else {
                    break;  // 如果没有收到主机回复,则立即重连
                }
            }
            // 下面是项目的具体逻辑实现,感兴趣的可以看看
#if 0
            if (socket_msg_queue->next != NULL) {
                socket_msg_rear = read_queue(socket_msg_top, socket_msg_rear);
                if (strstr(out->msgall->socket_msg, "xxxxxxxx")){
                    /* 副机暂时没有向主机主动发送消息的需求 */
                    ret = send(client_socket, "xxxxxxxx", sizeof("xxxxxxxx"), 0);
                    if (ret <= 0) {
                        LOGE("副机发送消息失败:%s \n", "xxxxxxxx");
                        mqtt_message_all * msgall = (mqtt_message_all *)klmalloc(sizeof(mqtt_message_all));
                        msgall->socket_msg = (char *)klmalloc(sizeof("xxxxxxxx"));
                        strcpy(msgall->socket_msg, "xxxxxxxx");
                        socket_msg_rear = write_queue(socket_msg_rear, msgall);
                    } else {
                        free(out->msgall->socket_msg);
                        free(out->msgall);
                    }
                } else if (strstr(out->msgall->socket_msg, "nnnnnnnn")) {
                    ;
                }
            }
#endif // 0
            memset(rcv_buff, 0, sizeof(rcv_buff));
            ret = recv(client_socket, rcv_buff, sizeof(rcv_buff), 0);
            if (ret > 0) {
                LOGD("副机收到socket消息:%s \n", rcv_buff);
                send(client_socket, rcv_buff, sizeof(rcv_buff), 0);  // 镜像返回消息
                if (strstr(rcv_buff, "reboot")) {
                    auto_reboot();
                } else if (strstr(rcv_buff, "get_event_status")) {
                    if (event_handling == 0) {
                        ret = send(client_socket, "noevent", 8, 0);
                        if (ret <= 0) {
                            LOGE("副机发送消息失败:%s \n", "noevent");
                            mqtt_message_all * msgall = (mqtt_message_all *)klmalloc(sizeof(mqtt_message_all));
                            msgall->socket_msg = (char *)klmalloc(sizeof("noevent"));
                            strcpy(msgall->socket_msg, "noevent");
                            socket_msg_rear = write_queue(socket_msg_rear, msgall);
                        }
                    } else {
                        ret = send(client_socket, "handling", 9, 0);
                        if (ret <= 0) {
                            LOGE("副机发送消息失败:%s \n", "handling");
                            mqtt_message_all * msgall = (mqtt_message_all *)klmalloc(sizeof(mqtt_message_all));
                            msgall->socket_msg = (char *)klmalloc(sizeof("handling"));
                            strcpy(msgall->socket_msg, "handling");
                            socket_msg_rear = write_queue(socket_msg_rear, msgall);
                        }
                    }
                } else if (strstr(rcv_buff, "xxxxxxxxx")) {
                    ret = send(client_socket, "xxxxxxxxx", 10, 0);
                    if (ret <= 0) {
                        LOGE("副机发送消息失败:%s \n", "xxxxxxxxx");
                        /* 此处最好重新入队,再次发送。 */
                    }
                }
            }
            sleep(5);  // 5s心跳一次,server端15s检查一次
        }
        close(client_socket);  // 记得及时释放文件描述符(很宝贵的资源)
        sleep(10);
    }
}

下面是主机和副机的log打印:

客户端

image-20230927164453026

服务端:

image-20230927164622710

这个人很懒,什么都没留下~

回复

使用道具 举报

jkernet | 2023-9-27 21:05:51 | 显示全部楼层
好文章学习
回复 支持 反对

使用道具 举报

WangChong | 2023-9-27 22:11:01 | 显示全部楼层
学习了
回复

使用道具 举报

lsrly | 2023-10-4 09:34:24 | 显示全部楼层
学习打卡
回复

使用道具 举报

您需要登录后才可以回帖 登录 | 立即注册

本版积分规则