之前有写过一篇爱星物联平台设备OTA功能介绍的文章《今天给设备来了一次OTA》,如今源码在手我们对着功能来分析下他的代码吧
为了让大家更清晰的了解设备升级的流程, 我拿了爱星物联一张时序图;
下面我将根据以下几个步骤介绍爱星物联开源项目的OTA功能;
- OTA的数据下发数据包
- OTA的全量、灰度发布逻辑
- OTA的升级版本检查
- OTA的升级进度上报
OTA的下发数据包
{
"header": {...},
"payload": {
"param": {
"chanel": 1, //升级渠道(1-云端、2-APP)
"pointVer": "1.1.0",//指定目标版本
"baseVer": "1.1.0",//固件最低兼容版本
"mcuBaseVer": "1.0.0",//mcu最低兼容版本
"otaType": "module_ota_all",//OTA升级类型 module_ota_all、module_mcu_all、module_extand_all、module_bt_all、module_zigbee_all
"appUrl": "https://xxx.com/xxx_app.bin",//oss永久有效的外链地址
"mcuUrl": "https://xxx.com/xxx_mcu.bin",//oss永久有效的MCU升级包外链地址
"md5": "12312312312312",//前待升级的固件包文件的MD5值
"pubId": "发布Id",//发布编号(升级结果需要回传)
"otaVer": "1.0.1"//OTA当前升级版本(升级结果需要回传)
}
}
}
OTA的全量、灰度发布逻辑
在我们发布升级的时候,系统提供了全量发布、灰度发布两种方式,对于这两种方式系统主要是通过规则筛选出对应的设备Id进行推送;
全量:不需要其它条件,只要获取当前产品的所有设备Id都推送OTA升级即可;
//iot_device_service\service\iot_ota_upgrade_record.go
t := ormMysql.Use(iotmodel.GetDB()).TIotDeviceTriad
totalCount, errCount := do.Count()
if errCount != nil {
return 0, errCount}
if totalCount == 0 {
return 0, errors.New("无设备数据")
}
if _, ok := config.DBMap["iot_device"]; !ok {
return 0, errors.New("数据库未初始化")
}
go s.asyncPublishOta(totalCount, req)
因为OTA发布涉及设备较多,会有一定的耗时,我们不可能一直等待处理完成,这里使用协程处理发布的动作;下面是asyncPublishOta方法,分页查询设备并推送;并且调用pushPublishRecord推送发布进度;
func (s IotOtaUpgradeRecordSvc) asyncPublishOta(totalCount int64, req *proto.GenerateUpgradeDeviceRequest) error {
t := ormMysql.Use(iotmodel.GetDB()).TIotDeviceTriad
do := t.WithContext(context.Background()).Where(t.ProductKey.Eq(req.ProductKey))
var limit int = 500
pageCount := int(math.Ceil(float64(totalCount) / float64(limit)))
if pageCount == 0 {
s.pushPublishRecord(req.PublishId, 0, 0, 4, "无设备数据")
return errors.New("无设备数据")
}
var dataDB, ok = config.DBMap["iot_device"]
if !ok {
s.pushPublishRecord(req.PublishId, 0, int(totalCount), 4, "数据库未初始化")
return errors.New("数据库未初始化")
}
dataTx := orm.Use(dataDB)
var successCount int = 0
for page := 1; page <= pageCount; page++ {
offset := limit * (page - 1)
var result []DeviceSimpleInfo
_, err := do.ScanByPage(&result, offset, limit)
if err != nil {
return err
}
if len(result) == 0 {
break
}
tRecord := dataTx.TIotOtaUpgradeRecord
list := []*model.TIotOtaUpgradeRecord{}
for _, res := range result {
version, isAuto, _ := s.getDeviceCachedIsAutoAndVer(res.Did)
list = append(list, &model.TIotOtaUpgradeRecord{
Id: iotutil.GetNextSeqInt64(),
Version: req.Version,
PublishId: req.PublishId,
ProductKey: req.ProductKey,
DeviceId: res.Did,
OriginalVersion: res.DeviceVersion,
Status: 1,
IsGray: req.IsGray,
TenantId: req.TenantId,
Area: res.Country,
FwVer: version,
IsAutoUpgrade: isAuto,
})
}
err = tRecord.WithContext(context.Background()).Create(list...)
if err != nil {
break
}
//如果为强制升级
if req.UpgradeMode == 2 {
s.batchPublishNotice(req, list)
}
s.batchSetDeviceCached(req, list)
successCount += len(list)
s.pushPublishRecord(req.PublishId, successCount, int(totalCount), 4, "发布中")
//清空
list = make([]*model.TIotOtaUpgradeRecord, 0)
}
s.pushPublishRecord(req.PublishId, successCount, int(totalCount), 3, "ok")
return nil}
在方法中,我们能看到强制的的概念,如果是强制升级我们将推送mqtt消息给到APP,让APP限制用户对设备的控制;APP收到主题“iotprotocol.NOTICE_HEAD_UPGRADE_NOTICE_NAME”的消息,处理限制逻辑;
//推送升级通知
var obj iotprotocol.PackNotice
buf, _ := obj.Encode(iotprotocol.NOTICE_HEAD_UPGRADE_NOTICE_NAME, map[string]interface{}{
"version": req.Version,
"isGray": req.IsGray,
"pubId": req.PublishId,
"isAuto": req.IsAuto,
"upgradeMode": req.UpgradeMode,
"hasForceUpgrade": req.UpgradeMode == 2,
})
_, pubErr := rpcClient.ClientMqttService.BatchPublish(context.Background(), &proto.BatchPublishMessage{
TopicFullNameList: topics,
MessageContent: string(buf),
Qos: proto.Qos_ExactlyOnce,
Retained: false,
})
//这里需要注意,如果是强制升级、并且用户统一了自动升级,那么系统需要直接给用户的设备推送升级指令
// device.IsAutoUpgrade == true ota升级包是开启自动升级
// req.IsAuto == 1 用户授权自动升级
if device.IsAutoUpgrade && req.IsAuto == 1{
s.publishUpgrade(req, device.DeviceId)
}
灰度升级:有以下三种模式,其中按比例、按数量灰度将随机获取设备升级,指定设备则是指定具体设备进行升级,方便ota测试过程;
按比例获取设备发布升级
指定数量的设备发布升级
批量指定设备升级
这里获取比例和指定数量,代码中是使用sql语句随机获取的方式。
if req.GrayType == iotconst.OTA_GRAY_TYPE_SCALE { //比例随机
str.WriteString(fmt.Sprintf(" where C.ROW_NUM<=(@row_num*%v)", float64(req.GrayScale)/100))
} else if req.GrayType == iotconst.OTA_GRAY_TYPE_NUMBER {//指定数量随机
str.WriteString(fmt.Sprintf(" where C.ROW_NUM<=%v", req.GrayScale))
}
指定设备则是,直接提供设备Id列表搜索设备信息
str.WriteString(fmt.Sprintf(" where C.did in (%v)", "'" + strings.Join(req.DeviceIds, "'")
OTA的升级版本检查
// iot_product_service\handler\opm_ota_publish_handler.go
// 详见CheckOtaVersion方法
// 获取设备缓存的状态数据
devStatus, err := iotredis.GetClient().HGetAll(context.Background(), iotconst.HKEY_DEV_DATA_PREFIX+request.DeviceId).Result()
if err != nil {
return nil
}
//根据产品配置的固件类型逐一检查升级情况,注意这里会优先检查通讯模组固件
request.FirmwareTypes = []int32{iotconst.FIRMWARE_TYPE_MODULE}
res, err = s.CheckOtaVersion(request, "")
//如果res.Code == 101,标识模组固件不需要升级
//则获取产品配置的其它固件类型,这里有蓝牙、zigbee、扩展固件类型
for _, info := range firmList {
var version, versionKey string
//这里通过固件类型获取缓存的设备固件的真实版本字段Key
switch info.FirmwareType {
case iotconst.FIRMWARE_TYPE_MODULE:
continue //模组固件已经检查
case iotconst.FIRMWARE_TYPE_BLE:
versionKey = "bleVer"
case iotconst.FIRMWARE_TYPE_ZIGBEE:
versionKey = "zigbeeVer"
case iotconst.FIRMWARE_TYPE_EXTAND:
versionKey = "extends"
case iotconst.FIRMWARE_TYPE_MCU:
versionKey = "mcuVer"
default:
continue
}
//这里检查如果有上报对应固件版本号,才会检查升级情况
if val, ok := devStatus[versionKey]; ok && val != "" {
//扩展版本号赋值
//因为扩展固件可能会存在多个,所以这里扩展固件版本上报的数据为数组,需要特殊处理
if versionKey == "extends" {
var extends []iotprotocol.ExtendItem
if err := iotutil.JsonToStruct(val, &extends); err == nil {
for _, extend := range extends {
if extend.Key == info.FirmwareKey {
version = extend.Ver
break
}
}
}
} else {
version = iotutil.ToString(val)
}
}
if version != "" {
request.Version = version
request.FirmwareTypes = []int32{info.FirmwareType}
res, err = s.CheckOtaVersion(request, info.FirmwareKey)
if res.Code == 200 {
//存在升级数据,返回升级信息
break
}
}
}
OTA的升级进度上报
//iot_device_service\service\ota_upgrade_device.go
//详见OtaUpgradeDevice方法
//这里是订阅设备上报topic:{ProductKey}/{DeviceId}/upgrade的处理逻,记录ota上报的进度和结果,在根据结果标记升级状态,清理升级缓存等;
//上报数据示例如下:{\"code\":0,\"progress\":90,\"otaState\":\"Downloading\",\"otaVer\":\"1.0.1\",\"pubId\":\"xxxxxxx\"}}
//otaState: 该字段表示OTA的状态,主要有以下三种状态
// Downloading: 下载中,当OTA处于此状态时,设备将会上报下载进度。而上报进度时间间隔以10 - 30%的间隔上报。
// Installing: 安装中,当OTA处于此状态时,设备将会处于离线状态。而该状态是设备即将进入离线状态前发送给云端的。
//code:
// 0: 表示OTA成功
// 1:表示下载失败
// 2:表示安装失败
// 3:协议数据错误
// 4:OTA数据包错误
//progress:OTA上报的进度
// sendAppMessage 此方法将升级结果推送给APP,APP解除正在的限制
device, err := s.getDeviceCached()
if err != nil {
iotlogger.LogHelper.Error("设备信息获取失败,设备Id=" + s.DevId)
return
} else {
userId, _ = iotutil.ToInt64Err(device["userId"])
homeId, _ = iotutil.ToInt64Err(device["homeId"])
appKey = device["appKey"]
tenantId = device["tenantId"]
productKey = device["productKey"]
area = device["country"]
fwVer = device["fwVer"]
}
svc := IotOtaUpgradeRecordSvc{Ctx: context.Background()}
err = svc.ReportUpgradeResult(s.DevId, productKey, version, tenantId, pubId, area, fwVer, state, code, progress)
if err != nil {
iotlogger.LogHelper.Error("修改进度失败", s.DevId, state, code, progress)
return
}
isSuccess, err := s.checkStatusIsUpgradeSuccess(state, code)
if err != nil {
// sendAppMessage 将升级结果推送给APP,APP解除正在的限制
s.sendAppMessage(appKey, tenantId, s.DevId, homeId, userId, false, map[string]string{})
} else {
if isSuccess {
var needSetCached = true
//判断版本当前升级版本是否大于ota升级版本
compareRes, _ := iotutil.VerCompare(version, fwVer)
if compareRes == 1 {
needSetCached = false
}
if needSetCached {
//升级成功需要更新升级信息
s.setDeviceCached(map[string]string{
iotconst.FIELD_UPGRADE_HAS: "false",
iotconst.FIELD_UPGRADE_MODE: "0",
iotconst.FIELD_UPGRADE_RUNNING: "false",
iotconst.FIELD_UPGRADE_STATE: "",
iotconst.FIELD_UPGRADE_PROGRESS: "0",
})
} else {
//升级成功需要更新升级信息
s.setDeviceCached(map[string]string{
//iotconst.FIELD_IS_FW_VER: version,
iotconst.FIELD_UPGRADE_HAS: "false",
iotconst.FIELD_UPGRADE_STATE: "",
iotconst.FIELD_UPGRADE_PROGRESS: "0",
iotconst.FIELD_UPGRADE_RUNNING: "false",
})
}
//推送固件升级结果消息
s.sendAppMessage(appKey, tenantId, s.DevId, homeId, userId, true, map[string]string{})
} else {
if s.isUpgradeRunning(state, code) {
s.setDeviceCached(map[string]string{
iotconst.FIELD_UPGRADE_HAS: "true",
iotconst.FIELD_UPGRADE_PROGRESS: iotutil.ToString(progress),
iotconst.FIELD_UPGRADE_STATE: state,
iotconst.FIELD_UPGRADE_RUNNING: "true",
})
}
}
}
以上代码判断为平台对于固件OTA升级的代码片段,欢迎大家一起下载开源代码研究哦;