...
第一种时序数据,命令“insert into root.test.mydevice(timestamp, rpm, online) values (now(), 1321, true)”可以在IOTDB中读取数据,Python中用 from iotdb.Session import Session 就可以直接处理这个数据了。
第二种,需要通过DWFIO.py引入后处理。py引入后处理,DWFIO.py内容如下:
代码块 | ||||
---|---|---|---|---|
| ||||
import requests
import json
class DWFIO:
# 关于请求信息的参数配置
# url = "http://i-yopiw3lm.cloud.nelbds.org.cn:8180"
# username = "admin"
# passwd = "Dwf2021!"
'''
获取账户token
url: 链接
username: 用户名
passwd: 密码
'''
def __init__(self, url, username, passwd):
self.url = url
self.username = username
self.passwd = passwd
self.path = "/dwf/v1/omf"
self.headers = {
'accept': '*/*',
'Content-Type': 'application/json'
}
def update_token(self, token: str) -> dict:
self.headers["Authorization"] = token
return self.headers
def get_token(self) -> dict:
full_url = self.url + "/dwf/v1/app/login?password=" + self.passwd + "&userName=" + self.username
payload = {}
headers = self.headers
response = requests.request("GET", full_url, headers=headers, data=payload)
res_code = json.loads(response.text)["code"]
if res_code != 200:
print("get_token() error: ")
print(json.loads(response.text))
return -1
else:
return json.loads(response.text)["data"]
'''
通过类名称(classname)和条件(condition)查询关联类的对象
'''
def query_relations_by_condition(self, classname: str, condition: str) -> dict:
full_url = self.url + self.path + "/relations/" + classname + "/objects"
payload = json.dumps({
"condition": condition
})
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("query_relations_by_condition() error: ")
print(response.text)
return -1
else:
# 返回list, 其中的每一个元素都是一个字典类型的对象
return res["data"]
def create_obj(self, classname: str, obj: dict) -> dict:
full_url = self.url + self.path + "/entities/" + classname + "/objects-create"
payload = json.dumps([
obj
])
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("create_obj() error: ")
print(response.text)
return -1
else:
# 新建成功
return res["data"]
'''
创建关联类
'''
def create_relations(self, classname: str, obj: dict) -> int:
full_url = self.url + self.path + "/relations/" + classname + "/objects-create"
'''
{
"leftOid": "3237F985D415A04685A0E573DF588DC5",
"rightOid": "3B07F010E21F7F4DBF9CB2F406463940"
}
'''
payload = json.dumps([
obj
])
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("create_relations() error: ")
print(response.text)
return -1
else:
# 新建成功
return 1
def edit_obj(self, classname: str, obj: dict) -> int:
full_url = self.url + self.path + "/entities/" + classname + "/objects-update?forceUpdate=false"
payload = json.dumps([
obj
])
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("edit_obj() error: ")
print(response.text)
return -1
else:
# 新建成功
return 1
'''
通过oid删除关联类
'''
def delete_relations_by_oid(self, classname: str, oid: str) -> int:
full_url = self.url + self.path + "/relations/" + classname + "/objects-delete"
payload = json.dumps([
oid
])
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("delete_relations_by_oid() error: ")
print(response.text)
return -1
else:
# 新建成功
return 1
def delete_obj_by_oid(self, classname: str, oid: str) -> int:
full_url = self.url + self.path + "/entities/" + classname + "/objects-delete"
payload = json.dumps([
oid
])
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("delete_obj_by_oid() error: ")
print(response.text)
return -1
else:
# 新建成功
return 1
def query_objects_by_condition(self, classname: str, condition: str) -> dict:
full_url = self.url + self.path + "/entities/" + classname + "/objects"
payload = json.dumps({
"condition": condition
})
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("query_objects_by_condition() error: ")
print(response.text)
return -1
else:
# 新建成功
return res['data']
'''
通过oid查询实体类
'''
def query_objects_by_oid(self, classname: str, oid: str) -> dict:
full_url = self.url + self.path + "/entities/" + classname + "/objects/" + oid
payload = json.dumps([])
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if (not ("code" in res.keys())) or res["code"] != 200:
print("query_objects_by_oid() error: ")
print(response.text)
return -1
else:
# 返回list, 其中的每一个元素都是一个字典类型的对象
return res["data"]
def edit_relation_obj(self, classname: str, obj: dict) -> int:
full_url = self.url + self.path + "/relations/" + classname + "/objects-update"
payload = json.dumps([
obj
])
token = self.get_token()
headers = self.update_token(token)
response = requests.request("POST", full_url, headers=headers, data=payload)
res = json.loads(response.text)
if ("code" not in res.keys()) or res["code"] != 200:
print("edit_relation_obj() error: ")
print(response.text)
return -1
else:
# 新建成功
return 1
def edit_single_file_attribute(self, classname, oid, attribute_name, file_path):
full_url = self.url + self.path + "/classes/" + classname + "/objects/"+oid+"/attributes/"+attribute_name+"/bytes-update"
payload = {}
files = [
('file', ('table.txt', open(file_path, 'rb'), 'text/plain'))
]
headers = {
'accept': '*/*',
'Authorization': self.get_token()
}
res = requests.request("POST", full_url, headers=headers, data=payload, files=files)
res = res.json()
if (not ("code" in res.keys())) or res["code"] != 200:
print("edit_single_file_attribute() error: ")
print(res.text)
return -1
else:
# 文件添加成功
return 1
if __name__ == "__main__":
url = "http://172-20-0-213.ccyun.nelbds.org.cn:8180/api/app"
username = "admin"
passwd = "4771d95f"
my_dwf_io = DWFIO(url, username, passwd)
# obj = {
# "oid": "578546519AF48140AB018F93D9531331",
# "currentValue": None
# }
# my_dwf_io.edit_obj("OfflineData", obj)
res = my_dwf_io.query_objects_by_condition("Asset",
"")
# res = my_dwf_io.edit_single_file_attribute("FailureAnalysisApp",
# "16AD1C54E66D5D4BA63A6E592396C40B",
# "appLog",
# r"C:\Users\tbl\Desktop\table.txt")
# 读
res = my_dwf_io.query_objects_by_condition("FailureAnalysisApp",
"and obj.oid='1915C437CA3BF14CAC9BEC6235F0C0A3'")
print(res)
# 写
obj = {
"oid": '1915C437CA3BF14CAC9BEC6235F0C0A3', # edit oid必填
"appstatus": "修改的状态"
}
my_dwf_io.edit_obj("FailureAnalysisApp", obj)
print("修改后的结果")
res = my_dwf_io.query_objects_by_condition("FailureAnalysisApp",
"and obj.oid='1915C437CA3BF14CAC9BEC6235F0C0A3'")
print(res)
|