日韩无码专区无码一级三级片|91人人爱网站中日韩无码电影|厨房大战丰满熟妇|AV高清无码在线免费观看|另类AV日韩少妇熟女|中文日本大黄一级黄色片|色情在线视频免费|亚洲成人特黄a片|黄片wwwav色图欧美|欧亚乱色一区二区三区

RELATEED CONSULTING
相關(guān)咨詢
選擇下列產(chǎn)品馬上在線溝通
服務(wù)時間:8:30-17:00
你可能遇到了下面的問題
關(guān)閉右側(cè)工具欄

新聞中心

這里有您想知道的互聯(lián)網(wǎng)營銷解決方案
使用ZeroMQ消息庫在C和Python間共享數(shù)據(jù)

ZeroMQ 是一個快速靈活的消息庫,用于數(shù)據(jù)收集和不同編程語言間的數(shù)據(jù)共享。

創(chuàng)新互聯(lián)是一家專業(yè)提供和平企業(yè)網(wǎng)站建設(shè),專注與網(wǎng)站建設(shè)、成都網(wǎng)站制作、HTML5、小程序制作等業(yè)務(wù)。10年已為和平眾多企業(yè)、政府機構(gòu)等服務(wù)。創(chuàng)新互聯(lián)專業(yè)網(wǎng)站建設(shè)公司優(yōu)惠進行中。

作為軟件工程師,我有多次在要求完成指定任務(wù)時感到渾身一冷的經(jīng)歷。其中有一次,我必須在一些新的硬件基礎(chǔ)設(shè)施和云基礎(chǔ)設(shè)施之間寫一個接口,這些硬件需要 C 語言,而云基礎(chǔ)設(shè)施主要是用 Python。

實現(xiàn)的方式之一是 用 C 寫擴展模塊,Python 支持 C 擴展的調(diào)用??焖贋g覽文檔后發(fā)現(xiàn),這需要編寫大量的 C 代碼。這樣做的話,在有些情況下效果還不錯,但不是我喜歡的方式。另一種方式就是將兩個任務(wù)放在不同的進程中,并使用 ZeroMQ 消息庫 在兩者之間交換消息。

在發(fā)現(xiàn) ZeroMQ 之前,遇到這種類型的情況時,我選擇了編寫擴展的方式。這種方式不算太差,但非常費時費力。如今,為了避免那些問題,我將一個系統(tǒng)細分為獨立的進程,通過 通信套接字 發(fā)送消息來交換信息。這樣,不同的編程語言可以共存,每個進程也變簡單了,同時也容易調(diào)試。

ZeroMQ 提供了一個更簡單的過程:

  1. 編寫一小段 C 代碼,從硬件讀取數(shù)據(jù),然后把發(fā)現(xiàn)的東西作為消息發(fā)送出去。
  2. 使用 Python 編寫接口,實現(xiàn)新舊基礎(chǔ)設(shè)施之間的對接。

Pieter Hintjens 是 ZeroMQ 項目發(fā)起者之一,他是個擁有 有趣視角和作品 的非凡人物。

準備

本教程中,需要:

  • 一個 C 編譯器(例如 GCC 或 Clang)
  • libzmq 庫
  • Python 3
  • ZeroMQ 的 Python 封裝

Fedora 系統(tǒng)上的安裝方法:

 
 
 
  1. $ dnf install clang zeromq zeromq-devel python3 python3-zmq

Debian 和 Ubuntu 系統(tǒng)上的安裝方法:

 
 
 
  1. $ apt-get install clang libzmq5 libzmq3-dev python3 python3-zmq

如果有問題,參考對應項目的安裝指南(上面附有鏈接)。

編寫硬件接口庫

因為這里針對的是個設(shè)想的場景,本教程虛構(gòu)了包含兩個函數(shù)的操作庫:

  • fancyhw_init() 用來初始化(設(shè)想的)硬件
  • fancyhw_read_val() 用于返回從硬件讀取的數(shù)據(jù)

將庫的完整代碼保存到文件 libfancyhw.h 中:

 
 
 
  1. #ifndef LIBFANCYHW_H
  2. #define LIBFANCYHW_H
  3.  
  4. #include
  5. #include
  6.  
  7. // This is the fictitious hardware interfacing library
  8.  
  9. void fancyhw_init(unsigned int init_param)
  10. {
  11. srand(init_param);
  12. }
  13.  
  14. int16_t fancyhw_read_val(void)
  15. {
  16. return (int16_t)rand();
  17. }
  18.  
  19. #endif

這個庫可以模擬你要在不同語言實現(xiàn)的組件間交換的數(shù)據(jù),中間有個隨機數(shù)發(fā)生器。

設(shè)計 C 接口

下面從包含管理數(shù)據(jù)傳輸?shù)膸扉_始,逐步實現(xiàn) C 接口。

需要的庫

開始先加載必要的庫(每個庫的作用見代碼注釋):

 
 
 
  1. // For printf()
  2. #include
  3. // For EXIT_*
  4. #include
  5. // For memcpy()
  6. #include
  7. // For sleep()
  8. #include
  9.  
  10. #include
  11.  
  12. #include "libfancyhw.h"

必要的參數(shù)

定義 main 函數(shù)和后續(xù)過程中必要的參數(shù):

 
 
 
  1. int main(void)
  2. {
  3.     const unsigned int INIT_PARAM = 12345;
  4.     const unsigned int REPETITIONS = 10;
  5.     const unsigned int PACKET_SIZE = 16;
  6.     const char *TOPIC = "fancyhw_data";
  7.  
  8.     ...

初始化

所有的庫都需要初始化。虛構(gòu)的那個只需要一個參數(shù):

 
 
 
  1. fancyhw_init(INIT_PARAM);

ZeroMQ 庫需要實打?qū)嵉某跏蓟?。首先,定義對象 context,它是用來管理全部的套接字的:

 
 
 
  1. void *context = zmq_ctx_new();
  2.  
  3. if (!context)
  4. {
  5. printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
  6.  
  7. return EXIT_FAILURE;
  8. }

之后定義用來發(fā)送數(shù)據(jù)的套接字。ZeroMQ 支持若干種套接字,各有其用。使用 publish 套接字(也叫 PUB 套接字),可以復制消息并分發(fā)到多個接收端。這使得你可以讓多個接收端接收同一個消息。沒有接收者的消息將被丟棄(即不會入消息隊列)。用法如下:

 
 
 
  1. void *data_socket = zmq_socket(context, ZMQ_PUB);

套接字需要綁定到一個具體的地址,這樣客戶端就知道要連接哪里了。本例中,使用了 TCP 傳輸層(當然也有 其它選項,但 TCP 是不錯的默認選擇):

 
 
 
  1. const int rb = zmq_bind(data_socket, "tcp://*:5555");
  2.  
  3. if (rb != 0)
  4. {
  5. printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
  6.  
  7. return EXIT_FAILURE;
  8. }

下一步, 計算一些后續(xù)要用到的值。 注意下面代碼中的 TOPIC,因為 PUB 套接字發(fā)送的消息需要綁定一個主題。主題用于供接收者過濾消息:

 
 
 
  1. const size_t topic_size = strlen(TOPIC);
  2. const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
  3.  
  4. printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);

發(fā)送消息

啟動一個發(fā)送消息的循環(huán),循環(huán) REPETITIONS 次:

 
 
 
  1. for (unsigned int i = 0; i < REPETITIONS; i++)
  2. {
  3. ...

發(fā)送消息前,先填充一個長度為 PACKET_SIZE 的緩沖區(qū)。本庫提供的是 16 個位的有符號整數(shù)。因為 C 語言中 int 類型占用空間大小與平臺相關(guān),不是確定的值,所以要使用指定寬度的 int 變量:

 
 
 
  1. int16_t buffer[PACKET_SIZE];
  2.  
  3. for (unsigned int j = 0; j < PACKET_SIZE; j++)
  4. {
  5. buffer[j] = fancyhw_read_val();
  6. }
  7.  
  8. printf("Read %u data values\n", PACKET_SIZE);

消息的準備和發(fā)送的第一步是創(chuàng)建 ZeroMQ 消息,為消息分配必要的內(nèi)存空間??瞻椎南⑹怯糜诜庋b要發(fā)送的數(shù)據(jù)的:

 
 
 
  1. zmq_msg_t envelope;
  2.  
  3. const int rmi = zmq_msg_init_size(&envelope, envelope_size);
  4. if (rmi != 0)
  5. {
  6. printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
  7.  
  8. zmq_msg_close(&envelope);
  9.  
  10. break;
  11. }

現(xiàn)在內(nèi)存空間已分配,數(shù)據(jù)保存在 ZeroMQ 消息 “信封”中。函數(shù) zmq_msg_data() 返回一個指向封裝數(shù)據(jù)緩存區(qū)頂端的指針。第一部分是主題,之后是一個空格,最后是二進制數(shù)。主題和二進制數(shù)據(jù)之間的分隔符采用空格字符。需要遍歷緩存區(qū)的話,使用類型轉(zhuǎn)換和 指針算法。(感謝 C 語言,讓事情變得直截了當。)做法如下:

 
 
 
  1. memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
  2. memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
  3. memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t))

通過 data_socket 發(fā)送消息:

 
 
 
  1. const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
  2. if (rs != envelope_size)
  3. {
  4. printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
  5.  
  6. zmq_msg_close(&envelope);
  7.  
  8. break;
  9. }

使用數(shù)據(jù)之前要先解除封裝:

 
 
 
  1. zmq_msg_close(&envelope);
  2.  
  3. printf("Message sent; i: %u, topic: %s\n", i, TOPIC);

清理

C 語言不提供 垃圾收集 功能,用完之后記得要自己掃尾。發(fā)送消息之后結(jié)束程序之前,需要運行掃尾代碼,釋放分配的內(nèi)存:

 
 
 
  1. const int rc = zmq_close(data_socket);
  2.  
  3. if (rc != 0)
  4. {
  5. printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));
  6.  
  7. return EXIT_FAILURE;
  8. }
  9.  
  10. const int rd = zmq_ctx_destroy(context);
  11.  
  12. if (rd != 0)
  13. {
  14. printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));
  15.  
  16. return EXIT_FAILURE;
  17. }
  18.  
  19. return EXIT_SUCCESS;

完整 C 代碼

保存下面完整的接口代碼到本地名為 hw_interface.c 的文件:

 
 
 
  1. // For printf()
  2. #include
  3. // For EXIT_*
  4. #include
  5. // For memcpy()
  6. #include
  7. // For sleep()
  8. #include
  9.  
  10. #include
  11.  
  12. #include "libfancyhw.h"
  13.  
  14. int main(void)
  15. {
  16. const unsigned int INIT_PARAM = 12345;
  17. const unsigned int REPETITIONS = 10;
  18. const unsigned int PACKET_SIZE = 16;
  19. const char *TOPIC = "fancyhw_data";
  20.  
  21. fancyhw_init(INIT_PARAM);
  22.  
  23. void *context = zmq_ctx_new();
  24.  
  25. if (!context)
  26. {
  27. printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
  28.  
  29. return EXIT_FAILURE;
  30. }
  31.  
  32. void *data_socket = zmq_socket(context, ZMQ_PUB);
  33.  
  34. const int rb = zmq_bind(data_socket, "tcp://*:5555");
  35.  
  36. if (rb != 0)
  37. {
  38. printf("ERROR: ZeroMQ error occurred during zmq_ctx_new(): %s\n", zmq_strerror(errno));
  39.  
  40. return EXIT_FAILURE;
  41. }
  42.  
  43. const size_t topic_size = strlen(TOPIC);
  44. const size_t envelope_size = topic_size + 1 + PACKET_SIZE * sizeof(int16_t);
  45.  
  46. printf("Topic: %s; topic size: %zu; Envelope size: %zu\n", TOPIC, topic_size, envelope_size);
  47.  
  48. for (unsigned int i = 0; i < REPETITIONS; i++)
  49. {
  50. int16_t buffer[PACKET_SIZE];
  51.  
  52. for (unsigned int j = 0; j < PACKET_SIZE; j++)
  53. {
  54. buffer[j] = fancyhw_read_val();
  55. }
  56.  
  57. printf("Read %u data values\n", PACKET_SIZE);
  58.  
  59. zmq_msg_t envelope;
  60. const int rmi = zmq_msg_init_size(&envelope, envelope_size);
  61. if (rmi != 0)
  62. {
  63. printf("ERROR: ZeroMQ error occurred during zmq_msg_init_size(): %s\n", zmq_strerror(errno));
  64. zmq_msg_close(&envelope);
  65. break;
  66. }
  67. memcpy(zmq_msg_data(&envelope), TOPIC, topic_size);
  68.  
  69. memcpy((void*)((char*)zmq_msg_data(&envelope) + topic_size), " ", 1);
  70.  
  71. memcpy((void*)((char*)zmq_msg_data(&envelope) + 1 + topic_size), buffer, PACKET_SIZE * sizeof(int16_t));
  72. const size_t rs = zmq_msg_send(&envelope, data_socket, 0);
  73. if (rs != envelope_size)
  74. {
  75. printf("ERROR: ZeroMQ error occurred during zmq_msg_send(): %s\n", zmq_strerror(errno));
  76. zmq_msg_close(&envelope);
  77. break;
  78. }
  79. zmq_msg_close(&envelope);
  80.  
  81. printf("Message sent; i: %u, topic: %s\n", i, TOPIC);
  82.  
  83. sleep(1);
  84. }
  85.  
  86. const int rc = zmq_close(data_socket);
  87.  
  88. if (rc != 0)
  89. {
  90. printf("ERROR: ZeroMQ error occurred during zmq_close(): %s\n", zmq_strerror(errno));
  91.  
  92. return EXIT_FAILURE;
  93. }
  94.  
  95. const int rd = zmq_ctx_destroy(context);
  96.  
  97. if (rd != 0)
  98. {
  99. printf("Error occurred during zmq_ctx_destroy(): %s\n", zmq_strerror(errno));
  100.  
  101. return EXIT_FAILURE;
  102. }
  103.  
  104. return EXIT_SUCCESS;
  105. }

用如下命令編譯:

 
 
 
  1. $ clang -std=c99 -I. hw_interface.c -lzmq -o hw_interface

如果沒有編譯錯誤,你就可以運行這個接口了。貼心的是,ZeroMQ PUB 套接字可以在沒有任何應用發(fā)送或接受數(shù)據(jù)的狀態(tài)下運行,這簡化了使用復雜度,因為這樣不限制進程啟動的次序。

運行該接口:

 
 
 
  1. $ ./hw_interface
  2. Topic: fancyhw_data; topic size: 12; Envelope size: 45
  3. Read 16 data values
  4. Message sent; i: 0, topic: fancyhw_data
  5. Read 16 data values
  6. Message sent; i: 1, topic: fancyhw_data
  7. Read 16 data values
  8. ...
  9. ...

輸出顯示數(shù)據(jù)已經(jīng)通過 ZeroMQ 完成發(fā)送,現(xiàn)在要做的是讓一個程序去讀數(shù)據(jù)。

編寫 Python 數(shù)據(jù)處理器

現(xiàn)在已經(jīng)準備好從 C 程序向 Python 應用傳送數(shù)據(jù)了。

需要兩個庫幫助實現(xiàn)數(shù)據(jù)傳輸。首先是 ZeroMQ 的 Python 封裝:

 
 
 
  1. $ python3 -m pip install zmq

另一個就是 struct 庫,用于解碼二進制數(shù)據(jù)。這個庫是 Python 標準庫的一部分,所以不需要使用 pip 命令安裝。

Python 程序的第一部分是導入這些庫:

 
 
 
  1. import zmq
  2. import struct

重要參數(shù)

使用 ZeroMQ 時,只能向常量 TOPIC 定義相同的接收端發(fā)送消息:

 
 
 
  1. topic = "fancyhw_data".encode('ascii')
  2.  
  3. print("Reading messages with topic: {}".format(topic))

初始化

下一步,初始化上下文和套接字。使用 subscribe 套接字(也稱為 SUB 套接字),它是 PUB 套接字的天生伴侶。這個套接字發(fā)送時也需要匹配主題。

 
 
 
  1. with zmq.Context() as context:
  2.     socket = context.socket(zmq.SUB)
  3.  
  4.     socket.connect("tcp://127.0.0.1:5555")
  5.     socket.setsockopt(zmq.SUBSCRIBE, topic)
  6.  
  7.     i = 0
  8.  
  9.     ...

接收消息

啟動一個無限循環(huán),等待接收發(fā)送到 SUB 套接字的新消息。這個循環(huán)會在你按下 Ctrl+C 組合鍵或者內(nèi)部發(fā)生錯誤時終止:

 
 
 
  1.     try:
  2.         while True:
  3.  
  4.             ... # we will fill this in next
  5.  
  6.     except KeyboardInterrupt:
  7.         socket.close()
  8.     except Exception as error:
  9.         print("ERROR: {}".format(error))
  10.         socket.close()

這個循環(huán)等待 recv() 方法獲取的新消息,然后將接收到的內(nèi)容從第一個空格字符處分割開,從而得到主題:

 
 
 
  1. binary_topic, data_buffer = socket.recv().split(b' ', 1)

解碼消息

Python 此時尚不知道主題是個字符串,使用標準 ASCII 編解碼器進行解碼:

 
 
 
  1. topic = binary_topic.decode(encoding = 'ascii')
  2.  
  3. print("Message {:d}:".format(i))
  4. print("\ttopic: '{}'".format(topic))

下一步就是使用 struct 庫讀取二進制數(shù)據(jù),它可以將二進制數(shù)據(jù)段轉(zhuǎn)換為明確的數(shù)值。首先,計算數(shù)據(jù)包中數(shù)值的組數(shù)。本例中使用的 16 個位的有符號整數(shù)對應的是 struct 格式字符 中的 h

 
 
 
  1. packet_size = len(data_buffer) // struct.calcsize("h")
  2.  
  3. print("\tpacket size: {:d}".format(packet_size))

知道數(shù)據(jù)包中有多少組數(shù)據(jù)后,就可以通過構(gòu)建一個包含數(shù)據(jù)組數(shù)和數(shù)據(jù)類型的字符串,來定義格式了(比如“16h”):

 
 
 
  1. struct_format = "{:d}h".format(packet_size)

將二進制數(shù)據(jù)串轉(zhuǎn)換為可直接打印的一系列數(shù)字:

 
 
 
  1. data = struct.unpack(struct_format, data_buffer)
  2.  
  3. print("\tdata: {}".format(data))

完整 Python 代碼

下面是 Python 實現(xiàn)的完整的接收端:

 
 
 
  1. #! /usr/bin/env python3
  2.  
  3. import zmq
  4. import struct
  5.  
  6. topic = "fancyhw_data".encode('ascii')
  7.  
  8. print("Reading messages with topic: {}".format(topic))
  9.  
  10. with zmq.Context() as context:
  11. socket = context.socket(zmq.SUB)
  12.  
  13. socket.connect("tcp://127.0.0.1:5555")
  14. socket.setsockopt(zmq.SUBSCRIBE, topic)
  15.  
  16. i = 0
  17.  
  18. try:
  19. while True:
  20. binary_topic, data_buffer = socket.recv().split(b' ', 1)
  21.  
  22. topic = binary_topic.decode(encoding = 'ascii')
  23.  
  24. print("Message {:d}:".format(i))
  25. print("\ttopic: '{}'".format(topic))
  26.  
  27. packet_size = len(data_buffer) // struct.calcsize("h")
  28.  
  29. print("\tpacket size: {:d}".format(packet_size))
  30.  
  31. struct_format = "{:d}h".format(packet_size)
  32.  
  33. data = struct.unpack(struct_format, data_buffer)
  34.  
  35. print("\tdata: {}".format(data))
  36.  
  37. i += 1
  38.  
  39. except KeyboardInterrupt:
  40. socket.close()
  41. except Exception as error:
  42. print("ERROR: {}".format(error))
  43. socket.close()

將上面的內(nèi)容保存到名為 online_analysis.py 的文件。Python 代碼不需要編譯,你可以直接運行它。

運行輸出如下:

 
 
 
  1. $ ./online_analysis.py
  2. Reading messages with topic: b'fancyhw_data'
  3. Message 0:
  4.         topic: 'fancyhw_data'
  5.         packet size: 16
  6.         data: (20946, -23616, 9865, 31416, -15911, -10845, -5332, 25662, 10955, -32501, -18717, -24490, -16511, -28861, 24205, 26568)
  7. Message 1:
  8.         topic: 'fancyhw_data'
  9.         packet size: 16
  10.         data: (12505, 31355, 14083, -19654, -9141, 14532, -25591, 31203, 10428, -25564, -732, -7979, 9529, -27982, 29610, 30475)
  11. ...
  12. ...

小結(jié)

本教程介紹了一種新方式,實現(xiàn)從基于 C 的硬件接口收集數(shù)據(jù),并分發(fā)到基于 Python 的基礎(chǔ)設(shè)施的功能。借此可以獲取數(shù)據(jù)供后續(xù)分析,或者轉(zhuǎn)送到任意數(shù)量的接收端去。它采用了一個消息庫實現(xiàn)數(shù)據(jù)在發(fā)送者和處理者之間的傳送,來取代同樣功能規(guī)模龐大的軟件。

本教程還引出了我稱之為“軟件粒度”的概念,換言之,就是將軟件細分為更小的部分。這種做法的優(yōu)點之一就是,使得同時采用不同的編程語言實現(xiàn)最簡接口作為不同部分之間溝通的組件成為可能。

實踐中,這種設(shè)計使得軟件工程師能以更獨立、合作更高效的方式做事。不同的團隊可以專注于數(shù)據(jù)分析的不同方面,可以選擇自己中意的實現(xiàn)工具。這種做法的另一個優(yōu)點是實現(xiàn)了零代價的并行,因為所有的進程都可以并行運行。ZeroMQ 消息庫 是個令人贊嘆的軟件,使用它可以讓工作大大簡化。 


當前標題:使用ZeroMQ消息庫在C和Python間共享數(shù)據(jù)
文章分享:http://m.5511xx.com/article/coeidio.html