背景介绍:项目使用了两台linux设备来控制同一个售货机,但是在后台是希望将两台设备看作一体。所以就少不了两台设备之间的简单通信。
我希望两者的socket能够保持长连接,并且能够断连自恢复。需求就这么简单!接下来看实现:
代码相对来说比较长,所以就以注释的形式进行说明了!
先来看server端实现:
void * socket_server_func(void * args)
{
int ret = 0;
char rcv_buff[128] = {0};
// 这里添加了一个SOCK_NONBLOCK的socket类型,表示这个socket采用非阻塞形式通信
int server_socket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("192.168.1.10");
server_addr.sin_port = htons("9090");
bind(server_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
listen(server_socket, 20);
struct sockaddr client_addr;
socklen_t client_addr_size = sizeof(client_addr);
/*****************************************************************************/
/* 保持长连接的关键代码 */
/*****************************************************************************/
while (1) {
// server端的第一层循环,用于在客户端断开后,重新建立新的client_socket
int client_socket = accept(server_socket, (struct sockaddr *)&client_addr, &client_addr_size);
while (1) {
// server端的第二层循环,用于发送消息以及对副机的反馈进行处理。兼顾检测客户端心跳
if (socket_msg_queue->next != NULL) { // socket消息队列不为空的情况下
socket_msg_rear = read_queue(socket_msg_top, socket_msg_rear); // 出队
ret = send(client_socket, out->msgall->socket_msg, sizeof(out->msgall->socket_msg), 0);
if (ret <= 0) {
LOGE("副机发送消息失败:%s \n", out->msgall->socket_msg);
socket_msg_rear = write_queue(socket_msg_rear, out->msgall);
} else {
;
}
// 下面是一些项目需要的逻辑实现,就不多做介绍了
sleep(10);
memset(rcv_buff, 0, sizeof(rcv_buff));
ret = recv(client_socket, rcv_buff, 128, 0);
if (ret > 0) {
if (strstr(rcv_buff, "reboot")) {
LOGD("副机反馈:reboot\n");
} else if (strstr(rcv_buff, "noevent")) {
LOGD("副机反馈:noevent\n");
slave_event_handling = 0;
} else if (strstr(rcv_buff, "handling")) {
LOGD("副机反馈:handling\n");
slave_event_handling = 1;
}
}
free(out->msgall->socket_msg);
free(out->msgall);
}
// 检测心跳副机心跳,这里这么写,主要是因为我的副机除了心跳没有其他主动发消息的业务。所以,主机如果在一定时间内未收到来自副机的消息,那它一定是掉线了
ret = recv(client_socket, rcv_buff, 128, 0);
if (ret > 0) {
// 对客户端的心跳做镜像恢复,有助于副机判断是否保持连接
send(client_socket, rcv_buff, sizeof(rcv_buff), 0);
LOGD("主机收到副机Socket消息:%s \n", rcv_buff);
if (strstr(rcv_buff, "heartbeat")) {
slave_running = 1;
} else {
// 其实从理论上来讲,这个分支是多余的,但是为了严谨嘛
LOGE("副机已离线!\n");
sleep(5);
break;
}
} else {
// 如果在15s以内没有检测到副机的心跳,判断它离线
LOGE("副机已离线!\n");
sleep(5);
break; // 退出内层循环
}
sleep(15); // 这里的sleep很关键,需要大于客户端心跳的事件间隙
}
close(client_socket); // 关闭当前套接字,这个很关键。
sleep(10);
// 然后重新进入外层循环
}
close(server_socket);
}
然后是client端:
/**
* 与服务端建立连接并保持连接。
* 1. 连接服务端
* 2. 读取socket队列判断有无待发送消息
* 3. 发送消息(2 == true)
* 4. 接收消息
* 5. 消息处理(4 == true)
*/
void * socket_client_func(void * args)
{
char rcv_buff[128];
int client_socket = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, IPPROTO_TCP);
if (client_socket < 0) {
LOGE("socket error!\n");
return NULL;
}
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = inet_addr("192.168.1.10");
server_addr.sin_port = htons("9090");
// 与server端同样的原理,client端也采用双层循环,外部做重连,内层保持连接。
while (1) {
int ret = connect(client_socket, (struct sockaddr *)&server_addr, sizeof(server_addr));
if (ret) {
LOGE("主副机建立Socket连接失败!\n");
sleep(5); // 很有可能测试主机没上线,所以不要立即continue,做一定延时比较好
continue;
}
LOGI("主副机建立Socket连接成功!\n");
while (1) {
// 发布心跳消息,可以自定义心跳消息,使用时间戳可能比较好
ret = send(client_socket, "heartbeat", sizeof("heartbeat"), 0);
if (ret > 0) {
LOGD("副机发布心跳消息:heartbeat\n");
}
sleep(5);
memset(rcv_buff, 0, sizeof(rcv_buff));
// 获取来自主机的镜像回复
ret = recv(client_socket, rcv_buff, sizeof(rcv_buff), 0);
if (ret > 0) {
if (strstr(rcv_buff, "heartbeat")) { // 根据镜像回复判断连接状态
LOGD("心跳检测成功!\n");
} else {
break; // 如果没有收到主机回复,则立即重连
}
}
// 下面是项目的具体逻辑实现,感兴趣的可以看看
#if 0
if (socket_msg_queue->next != NULL) {
socket_msg_rear = read_queue(socket_msg_top, socket_msg_rear);
if (strstr(out->msgall->socket_msg, "xxxxxxxx")){
/* 副机暂时没有向主机主动发送消息的需求 */
ret = send(client_socket, "xxxxxxxx", sizeof("xxxxxxxx"), 0);
if (ret <= 0) {
LOGE("副机发送消息失败:%s \n", "xxxxxxxx");
mqtt_message_all * msgall = (mqtt_message_all *)klmalloc(sizeof(mqtt_message_all));
msgall->socket_msg = (char *)klmalloc(sizeof("xxxxxxxx"));
strcpy(msgall->socket_msg, "xxxxxxxx");
socket_msg_rear = write_queue(socket_msg_rear, msgall);
} else {
free(out->msgall->socket_msg);
free(out->msgall);
}
} else if (strstr(out->msgall->socket_msg, "nnnnnnnn")) {
;
}
}
#endif // 0
memset(rcv_buff, 0, sizeof(rcv_buff));
ret = recv(client_socket, rcv_buff, sizeof(rcv_buff), 0);
if (ret > 0) {
LOGD("副机收到socket消息:%s \n", rcv_buff);
send(client_socket, rcv_buff, sizeof(rcv_buff), 0); // 镜像返回消息
if (strstr(rcv_buff, "reboot")) {
auto_reboot();
} else if (strstr(rcv_buff, "get_event_status")) {
if (event_handling == 0) {
ret = send(client_socket, "noevent", 8, 0);
if (ret <= 0) {
LOGE("副机发送消息失败:%s \n", "noevent");
mqtt_message_all * msgall = (mqtt_message_all *)klmalloc(sizeof(mqtt_message_all));
msgall->socket_msg = (char *)klmalloc(sizeof("noevent"));
strcpy(msgall->socket_msg, "noevent");
socket_msg_rear = write_queue(socket_msg_rear, msgall);
}
} else {
ret = send(client_socket, "handling", 9, 0);
if (ret <= 0) {
LOGE("副机发送消息失败:%s \n", "handling");
mqtt_message_all * msgall = (mqtt_message_all *)klmalloc(sizeof(mqtt_message_all));
msgall->socket_msg = (char *)klmalloc(sizeof("handling"));
strcpy(msgall->socket_msg, "handling");
socket_msg_rear = write_queue(socket_msg_rear, msgall);
}
}
} else if (strstr(rcv_buff, "xxxxxxxxx")) {
ret = send(client_socket, "xxxxxxxxx", 10, 0);
if (ret <= 0) {
LOGE("副机发送消息失败:%s \n", "xxxxxxxxx");
/* 此处最好重新入队,再次发送。 */
}
}
}
sleep(5); // 5s心跳一次,server端15s检查一次
}
close(client_socket); // 记得及时释放文件描述符(很宝贵的资源)
sleep(10);
}
}
下面是主机和副机的log打印:
客户端
服务端:
这个人很懒,什么都没留下~