程序(Nodes)間的溝通訊息,是透過名為主題(Topic)的渠道來傳遞。一個Topic只包含一種訊息格式,程序可以在Topic上發佈訊息(傳送),也可以從Topic上訂閱訊息(接收),但他們並不知道自己在跟誰溝通。同一個Topic上可以有很多人發佈訊息,也可以很多人訂閱訊息。為增加效率,一個渠道上只有一個訊息,無駐列。
訊息的發送接收是利用Micro Object Request Broker(uORB)來達成,而uORB用來執行一個簡單的發佈-訂閱流程。
- 發佈一個主題(Topic):
發佈一個主題包含三個動作:定義(define)、廣告(advertise)、發佈更新(publish updates)。 - 定義(define)
寫一個.h檔(標頭檔),內容包含兩個動作:
(1)用ORB_DECLARE()宣告主題的名稱。
(2)定義主題的內容結構(structure)。
範例:topic.h
/* declare the topic */ORB_DECLARE(random_integer);
/* define the data structure that will be published where subscribers can see it */
struct random_integer_data{
int r;
};
注意,主題的名稱最好有意義,並且利用底線隔在字與字之間,把最一般性的字眼擺在最前面,例如raw sensor data,最好取名為sensor_raw。
定義一個主題,除了標頭檔,還必須在發佈主題的程式碼裡用ORB_DEFINE()建構主題資料結構。(解釋:"宣告"只是告訴編譯器變數的存在,"定義"是賦予變數初值,可以在同一行程式碼同時宣告並定義一個變數,但是在標頭檔裡,最好只是宣告變數而不要定義變數,否則若不只一個程式碼含入標頭檔時,會重複定義變數而產生錯誤訊息。在此處,標頭檔宣告有一個主題存在,但是在發佈主題的程式碼裡面才真正建立主題,而且一個主題只能建立一次,否則會發生錯誤。)
主題的資料結構包含:主題名稱、資料大小。
範例:publisher.c
#include <topic.h>
/* create topic metadata */
ORB_DEFINE(random_integer, struct random_integer_data); - 廣告(advertise)
發佈主題前,需用orb_advertise()來廣告主題。
int orb_advertise(const struct orb_metadata *meta, const void *data);
廣告的同時,也發布了第一筆資料。
第一個參數是一個指標,指向用ORB_DEFINE()建立出來的結構,這個指標由ORB_ID()函式來提供,ORB_ID()的輸入參數為主題的名稱。第二個參數為所要發佈的資料(須符合標頭檔定義的主題內容結構)。
廣告主題不能用於中斷程序中。同一時間只能由一個發佈者廣告一個主題,且用close()結束之後才可由另一個發佈者廣告,close()的輸入參數為orb_advertise()函數的回傳值(handler)。 - 發佈主題(publish updates)
orb_advertise()函式的回傳值(handler)可用來將新資料發佈。使用以下的函示:
int orb_publish(const struct orb_metadata *meta, int handle, const void *data);
第一個參數為ORB_ID()函式提供之指標。
第二個參數為orb_advertise()函式的回傳值(handle)。
第三個參數為所要發佈的資料。
ORB並無資料暫存,多個發佈者發布資料時,訂閱者只能看到最新的一筆發佈資料。 - 訂閱(Subscribing)
- 訂閱(Subscribing)
訂閱主題需先將定義主題的標頭檔含入,利用以下函式訂閱:
int orb_subscribe(const struct orb_metadata *meta);
輸入參數為ORB_ID()函式提供之指標。
使用以下函式來取消訂閱:
int orb_unsubscribe(int handle);
輸入參數為orb_subscribe()函式的回傳值(handler)。 - 從主題複製資料
使用者先宣告'一個用來儲存資料的變數(記憶體空間),再將主題上的資料複製到這個變數,利用以下函式來複製資料:
int orb_copy(const struct orb_metadata *meta, int handle, void *buffer);
第二個參數是orb_subscribe()函式的回傳值(handle)。
第三個參數為預先宣告用來儲存資料的記憶體空間。
複製的動作極微小且快速,可以保證取得的資料是最新的值。 - 檢查是否有更新
利用以下函式,使用者可以檢查最後一次使用orb_copy()之後,主題資料是否有更新。
int orb_check(int handle, bool *updated);
第二個參數是orb_subscribe()函式的回傳值(handle)。
第三個參數回傳false(未更新)、true(已更新)。
利用以下函式,可以得知主題最後更新資料的時間。
int orb_stat(int handle, uint64_t *time); - 等待更新
美一個主題的訂閱及發布,都可視為是對一個檔案的讀取和寫入,可以用poll()函式來判斷一個檔案是否已經可以讀取/寫入,poll()會在設定的timeout時間內等待並測是否個事件是否發生,等待期間城市不會繼續執行而是停留在poll()函式(block),直到poll()結束並返回。如果偵測到特定事件的發生,poll()返回並回傳值>0值,如果timeout,poll()返回並回傳0,如果錯誤poll()返回並回傳-1。
#include <poll.h>
int poll(struct pollfd fds[], nfds_t nfds, int timeout);
第一個參數是一個包含檔案編號(file descriptor)的pollfd結構陣列。
第二個參數是檔案個數。
第三個參數是等待時間(毫秒,ms)。
pollfd結構至少包含下列三個成員:
int fd 檔案編號(file descriptor)
short events 輸入事件旗標
short revents 輸出事件旗標
用poll()等待資料更新的使用方法如下:
先訂閱主題並取得回傳值(handler),將回傳值(handler)作為檔案編號(file descriptor)定義pollfd結構陣列,並設定需要偵測的事件為POLLIN,POLLIN為資料可讀取,每當主題上的資料被更新,POLLIN事件即會發生。程式範例如下:
int sensor_sub_fd = orb_subscribe(ORB_ID(snesor_combined));
struct pollfd fds[] = {.fd = sensor_sub_fd, .events = POLLIN};
while(true){
int poll_ret = poll(fds, 1, 1000);
if(poll_ret > 0){
if(fds[0].revents & POLLIN){
... ...
}
}
} - 限制更新速率
//////////////////未完待續//////////////////