导入的csv有两个,第一、导入的是iotdb时序数据,第二、导入实体类数据。
第一种时序数据,命令“insert into root.test.mydevice(timestamp, rpm, online) values (now(), 1321, true)”可以在IOTDB中读取数据,Python中用 from iotdb.Session import Session 就可以直接处理这个数据了。
第二种,需要通过DWFIO.py引入后处理。
import requests import json class DWFIO: # 关于请求信息的参数配置 # url = "http://i-yopiw3lm.cloud.nelbds.org.cn:8180" # username = "admin" # passwd = "Dwf2021!" ''' 获取账户token url: 链接 username: 用户名 passwd: 密码 ''' def __init__(self, url, username, passwd): self.url = url self.username = username self.passwd = passwd self.path = "/dwf/v1/omf" self.headers = { 'accept': '*/*', 'Content-Type': 'application/json' } def update_token(self, token: str) -> dict: self.headers["Authorization"] = token return self.headers def get_token(self) -> dict: full_url = self.url + "/dwf/v1/app/login?password=" + self.passwd + "&userName=" + self.username payload = {} headers = self.headers response = requests.request("GET", full_url, headers=headers, data=payload) res_code = json.loads(response.text)["code"] if res_code != 200: print("get_token() error: ") print(json.loads(response.text)) return -1 else: return json.loads(response.text)["data"] ''' 通过类名称(classname)和条件(condition)查询关联类的对象 ''' def query_relations_by_condition(self, classname: str, condition: str) -> dict: full_url = self.url + self.path + "/relations/" + classname + "/objects" payload = json.dumps({ "condition": condition }) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("query_relations_by_condition() error: ") print(response.text) return -1 else: # 返回list, 其中的每一个元素都是一个字典类型的对象 return res["data"] def create_obj(self, classname: str, obj: dict) -> dict: full_url = self.url + self.path + "/entities/" + classname + "/objects-create" payload = json.dumps([ obj ]) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("create_obj() error: ") print(response.text) return -1 else: # 新建成功 return res["data"] ''' 创建关联类 ''' def create_relations(self, classname: str, obj: dict) -> int: full_url = self.url + self.path + "/relations/" + classname + "/objects-create" ''' { "leftOid": "3237F985D415A04685A0E573DF588DC5", "rightOid": "3B07F010E21F7F4DBF9CB2F406463940" } ''' payload = json.dumps([ obj ]) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("create_relations() error: ") print(response.text) return -1 else: # 新建成功 return 1 def edit_obj(self, classname: str, obj: dict) -> int: full_url = self.url + self.path + "/entities/" + classname + "/objects-update?forceUpdate=false" payload = json.dumps([ obj ]) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("edit_obj() error: ") print(response.text) return -1 else: # 新建成功 return 1 ''' 通过oid删除关联类 ''' def delete_relations_by_oid(self, classname: str, oid: str) -> int: full_url = self.url + self.path + "/relations/" + classname + "/objects-delete" payload = json.dumps([ oid ]) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("delete_relations_by_oid() error: ") print(response.text) return -1 else: # 新建成功 return 1 def delete_obj_by_oid(self, classname: str, oid: str) -> int: full_url = self.url + self.path + "/entities/" + classname + "/objects-delete" payload = json.dumps([ oid ]) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("delete_obj_by_oid() error: ") print(response.text) return -1 else: # 新建成功 return 1 def query_objects_by_condition(self, classname: str, condition: str) -> dict: full_url = self.url + self.path + "/entities/" + classname + "/objects" payload = json.dumps({ "condition": condition }) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("query_objects_by_condition() error: ") print(response.text) return -1 else: # 新建成功 return res['data'] ''' 通过oid查询实体类 ''' def query_objects_by_oid(self, classname: str, oid: str) -> dict: full_url = self.url + self.path + "/entities/" + classname + "/objects/" + oid payload = json.dumps([]) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if (not ("code" in res.keys())) or res["code"] != 200: print("query_objects_by_oid() error: ") print(response.text) return -1 else: # 返回list, 其中的每一个元素都是一个字典类型的对象 return res["data"] def edit_relation_obj(self, classname: str, obj: dict) -> int: full_url = self.url + self.path + "/relations/" + classname + "/objects-update" payload = json.dumps([ obj ]) token = self.get_token() headers = self.update_token(token) response = requests.request("POST", full_url, headers=headers, data=payload) res = json.loads(response.text) if ("code" not in res.keys()) or res["code"] != 200: print("edit_relation_obj() error: ") print(response.text) return -1 else: # 新建成功 return 1 def edit_single_file_attribute(self, classname, oid, attribute_name, file_path): full_url = self.url + self.path + "/classes/" + classname + "/objects/"+oid+"/attributes/"+attribute_name+"/bytes-update" payload = {} files = [ ('file', ('table.txt', open(file_path, 'rb'), 'text/plain')) ] headers = { 'accept': '*/*', 'Authorization': self.get_token() } res = requests.request("POST", full_url, headers=headers, data=payload, files=files) res = res.json() if (not ("code" in res.keys())) or res["code"] != 200: print("edit_single_file_attribute() error: ") print(res.text) return -1 else: # 文件添加成功 return 1 if __name__ == "__main__": url = "http://172-20-0-213.ccyun.nelbds.org.cn:8180/api/app" username = "admin" passwd = "4771d95f" my_dwf_io = DWFIO(url, username, passwd) # obj = { # "oid": "578546519AF48140AB018F93D9531331", # "currentValue": None # } # my_dwf_io.edit_obj("OfflineData", obj) res = my_dwf_io.query_objects_by_condition("Asset", "") # res = my_dwf_io.edit_single_file_attribute("FailureAnalysisApp", # "16AD1C54E66D5D4BA63A6E592396C40B", # "appLog", # r"C:\Users\tbl\Desktop\table.txt") # 读 res = my_dwf_io.query_objects_by_condition("FailureAnalysisApp", "and obj.oid='1915C437CA3BF14CAC9BEC6235F0C0A3'") print(res) # 写 obj = { "oid": '1915C437CA3BF14CAC9BEC6235F0C0A3', # edit oid必填 "appstatus": "修改的状态" } my_dwf_io.edit_obj("FailureAnalysisApp", obj) print("修改后的结果") res = my_dwf_io.query_objects_by_condition("FailureAnalysisApp", "and obj.oid='1915C437CA3BF14CAC9BEC6235F0C0A3'") print(res)