From patchwork Sat Jan 21 19:31:01 2023 Content-Type: text/plain; charset="utf-8" MIME-Version: 1.0 Content-Transfer-Encoding: 8bit X-Patchwork-Submitter: =?utf-8?q?Maciej_Kr=C3=BCger?= X-Patchwork-Id: 1730027 Return-Path: X-Original-To: incoming@patchwork.ozlabs.org Delivered-To: patchwork-incoming@legolas.ozlabs.org Authentication-Results: legolas.ozlabs.org; spf=none (no SPF record) smtp.mailfrom=lists.openwrt.org (client-ip=2607:7c80:54:3::133; helo=bombadil.infradead.org; envelope-from=openwrt-devel-bounces+incoming=patchwork.ozlabs.org@lists.openwrt.org; receiver=) Authentication-Results: legolas.ozlabs.org; dkim=pass (2048-bit key; secure) header.d=lists.infradead.org header.i=@lists.infradead.org header.a=rsa-sha256 header.s=bombadil.20210309 header.b=bVhPPaVN; dkim-atps=neutral Received: from bombadil.infradead.org (bombadil.infradead.org [IPv6:2607:7c80:54:3::133]) (using TLSv1.3 with cipher TLS_AES_256_GCM_SHA384 (256/256 bits) key-exchange X25519 server-signature ECDSA (P-384) server-digest SHA384) (No client certificate requested) by legolas.ozlabs.org (Postfix) with ESMTPS id 4NzmkN2Rp9z23gL for ; Sun, 22 Jan 2023 06:33:32 +1100 (AEDT) DKIM-Signature: v=1; a=rsa-sha256; q=dns/txt; c=relaxed/relaxed; d=lists.infradead.org; s=bombadil.20210309; h=Sender: Content-Transfer-Encoding:Content-Type:List-Subscribe:List-Help:List-Post: List-Archive:List-Unsubscribe:List-Id:MIME-Version:References:In-Reply-To: Message-Id:Date:Subject:Cc:To:From:Reply-To:Content-ID:Content-Description: Resent-Date:Resent-From:Resent-Sender:Resent-To:Resent-Cc:Resent-Message-ID: List-Owner; bh=BMESl17EPKW2727EhM/nQG12kvps4ozNzZ6/+zevjgQ=; b=bVhPPaVNewZ+V1 i3ksjX43z7T5vH1Z2qAgC5xm4537TkoEOXOkdUYAMvEl3dzz+oXCD+4rTgWfnB/cKv02h9QKexseu u4nghx1Qc4JaVT7DLQBF97UYm8w6uOuoFzKW/WSLYRpzESQsWwDh6NSqz1DNpRGwDlV+g8PClHA56 fSh4Z1bKx10X1MwihxFySifNFOu4a7sqV77C8JocjpaoPwS1Zt4zDYZehUB+o3/kfJveMJQv01NiL 6hXfSvOZwkzrzUJkGr3/fkIjeWSJw5woUO6OmGR3TasdwbUeSs0GeGrdVg9BbrsGpOgcDXpzaacdk ZK26hO8lD5J1PgJGgLTw==; Received: from localhost ([::1] helo=bombadil.infradead.org) by bombadil.infradead.org with esmtp (Exim 4.94.2 #2 (Red Hat Linux)) id 1pJJaN-00ELgb-Mt; Sat, 21 Jan 2023 19:31:31 +0000 Received: from relay2-d.mail.gandi.net ([2001:4b98:dc4:8::222]) by bombadil.infradead.org with esmtps (Exim 4.94.2 #2 (Red Hat Linux)) id 1pJJaK-00ELfT-3Q for openwrt-devel@lists.openwrt.org; Sat, 21 Jan 2023 19:31:30 +0000 Received: (Authenticated sender: mkg20001@mkg20001.io) by mail.gandi.net (Postfix) with ESMTPA id EB95040004; Sat, 21 Jan 2023 19:31:25 +0000 (UTC) From: =?utf-8?q?Maciej_Kr=C3=BCger?= To: openwrt-devel@lists.openwrt.org Cc: =?utf-8?q?Maciej_Kr=C3=BCger?= Subject: [PATCH] ubus: lua: Add support for async calls in lua Date: Sat, 21 Jan 2023 20:31:01 +0100 Message-Id: <20230121193100.1456140-2-mkg20001@mkg20001.io> X-Mailer: git-send-email 2.38.1 In-Reply-To: <20230121193100.1456140-1-mkg20001@mkg20001.io> References: <20230121193100.1456140-1-mkg20001@mkg20001.io> MIME-Version: 1.0 X-CRM114-Version: 20100106-BlameMichelson ( TRE 0.8.0 (BSD) ) MR-646709E3 X-CRM114-CacheID: sfid-20230121_113128_509775_EBEF4E1B X-CRM114-Status: GOOD ( 21.87 ) X-Spam-Score: -0.7 (/) X-Spam-Report: =?unknown-8bit?q?Spam_detection_software=2C_running_on_the_sy?= =?unknown-8bit?q?stem_=22bombadil=2Einfradead=2Eorg=22=2C?= =?unknown-8bit?q?_has_NOT_identified_this_incoming_email_as_spam=2E__The_ori?= =?unknown-8bit?q?ginal?= =?unknown-8bit?q?_message_has_been_attached_to_this_so_you_can_view_it_or_la?= =?unknown-8bit?q?bel?= =?unknown-8bit?q?_similar_future_email=2E__If_you_have_any_questions=2C_see?= =?unknown-8bit?q?_the_administrator_of_that_system_for_details=2E?= =?unknown-8bit?q?_?= =?unknown-8bit?q?_Content_preview=3A__From=3A_Maciej_Kr=C3=BCger_=3Cmkg20001?= =?unknown-8bit?q?=40gmail=2Ecom=3E_This_extends_the?= =?unknown-8bit?q?_conn=3Acall_function_to_take_a_function_as_it=27s_last_par?= =?unknown-8bit?q?ameter=2C_which_will?= =?unknown-8bit?q?_make_the_library_use_ubus=5Finvoke=5Fasync=2E_This_allows_?= =?unknown-8bit?q?streaming_the_logs_from?= =?unknown-8bit?q?_ubus=2C_among_other_things=2E_?= =?unknown-8bit?q?_?= =?unknown-8bit?q?_Content_analysis_details=3A___=28-0=2E7_points=2C_5=2E0_re?= =?unknown-8bit?q?quired=29?= =?unknown-8bit?q?_?= =?unknown-8bit?q?_pts_rule_name______________description?= =?unknown-8bit?q?_----_----------------------_------------------------------?= =?unknown-8bit?q?--------------------?= =?unknown-8bit?q?_-0=2E7_RCVD=5FIN=5FDNSWL=5FLOW______RBL=3A_Sender_listed_a?= =?unknown-8bit?q?t_https=3A//www=2Ednswl=2Eorg/=2C?= =?unknown-8bit?q?_low_trust?= =?unknown-8bit?b?IFsyMDAxOjRiOTg6ZGM0Ojg6MDowOjA6MjIyIGxpc3RlZCBpbl0=?= =?unknown-8bit?b?IFtsaXN0LmRuc3dsLm9yZ10=?= =?unknown-8bit?q?_-0=2E0_SPF=5FPASS_______________SPF=3A_sender_matches_SPF_?= =?unknown-8bit?q?record?= =?unknown-8bit?q?_0=2E0_SPF=5FHELO=5FNONE__________SPF=3A_HELO_does_not_publ?= =?unknown-8bit?q?ish_an_SPF_Record?= X-BeenThere: openwrt-devel@lists.openwrt.org X-Mailman-Version: 2.1.34 Precedence: list List-Id: OpenWrt Development List List-Unsubscribe: , List-Archive: List-Post: List-Help: List-Subscribe: , Sender: "openwrt-devel" Errors-To: openwrt-devel-bounces+incoming=patchwork.ozlabs.org@lists.openwrt.org From: Maciej Krüger This extends the conn:call function to take a function as it's last parameter, which will make the library use ubus_invoke_async. This allows streaming the logs from ubus, among other things. An example has been provided Signed-off-by: Maciej Krüger --- lua/stream_logs.lua | 29 ++++++++ lua/ubus.c | 168 +++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 195 insertions(+), 2 deletions(-) create mode 100644 lua/stream_logs.lua diff --git a/lua/stream_logs.lua b/lua/stream_logs.lua new file mode 100644 index 0000000..5490b02 --- /dev/null +++ b/lua/stream_logs.lua @@ -0,0 +1,29 @@ +-- Load modules +require "ubus" +require "uloop" + +uloop.init() + +-- Establish connection +local conn = ubus.connect() +if not conn then + error("Failed to connect to ubusd") +end + +local function handleLog (log) + for k, v in pairs(log) do + print(k , v) + end +end + +-- Stream logs +local ret = conn:call("log", "read", { stream = true, oneshot = false, lines = 0 }, function (log, control) + if control then + print('Control event', control.type) + else + handleLog(log) + end +end) + +uloop.run() + diff --git a/lua/ubus.c b/lua/ubus.c index 07b816d..51643df 100644 --- a/lua/ubus.c +++ b/lua/ubus.c @@ -19,6 +19,7 @@ #include #include #include +#include #define MODNAME "ubus" #define METANAME MODNAME ".meta" @@ -42,6 +43,12 @@ struct ubus_lua_event { int r; }; +struct ubus_lua_request { + struct ubus_request r; + struct ustream_fd fd; + int fnc; +}; + struct ubus_lua_subscriber { struct ubus_subscriber s; int rnotify; @@ -660,6 +667,134 @@ ubus_lua_call_cb(struct ubus_request *req, int type, struct blob_attr *msg) ubus_lua_parse_blob_array(L, blob_data(msg), blob_len(msg), true); } +static void +ubus_lua_async_complete_cb(struct ubus_request *req, int ret) +{ + struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r); + + lua_getglobal(state, "__ubus_cb_async"); + lua_rawgeti(state, -1, lureq->fnc); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + lua_pushnil(state); + + lua_newtable(state); + + lua_pushstring(state, "type"); + lua_pushstring(state, "connected"); + lua_settable(state, -3); + + lua_pushstring(state, "return"); + lua_pushnumber(state, ret); + lua_settable(state, -3); + + lua_call(state, 2, 0); + } else { + lua_pop(state, 1); + } +} + +static void +ubus_lua_async_cb(struct ustream *s, struct blob_attr *msg) +{ + struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream); + + lua_getglobal(state, "__ubus_cb_async"); + lua_rawgeti(state, -1, lureq->fnc); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + if( msg ){ + ubus_lua_parse_blob_array(state, blob_data(msg), blob_len(msg), true); + } else { + lua_pushnil(state); + } + lua_call(state, 1, 0); + } else { + lua_pop(state, 1); + } +} + +static void +ubus_lua_async_data_cb(struct ustream *s, int bytes) +{ + while (true) { + struct blob_attr *a; + int len, cur_len; + + a = (void*) ustream_get_read_buf(s, &len); + if (len < (int)sizeof(*a)) + break; + + cur_len = blob_len(a) + sizeof(*a); + if (len < cur_len) + break; + + ubus_lua_async_cb(s, a); + ustream_consume(s, cur_len); + } +} + +static void +ubus_lua_async_state_cb(struct ustream *s) +{ + struct ubus_lua_request *lureq = container_of(s, struct ubus_lua_request, fd.stream); + + lua_getglobal(state, "__ubus_cb_async"); + lua_rawgeti(state, -1, lureq->fnc); + lua_remove(state, -2); + + if (lua_isfunction(state, -1)) { + lua_pushnil(state); + + lua_newtable(state); + + lua_pushstring(state, "type"); + lua_pushstring(state, "closed"); + lua_settable(state, -3); + + lua_call(state, 2, 0); + } else { + lua_pop(state, 1); + } +} + +static void +ubus_lua_async_fd_cb(struct ubus_request *req, int fd) +{ + struct ubus_lua_request *lureq = container_of(req, struct ubus_lua_request, r); + + lureq->fd.stream.notify_read = ubus_lua_async_data_cb; + lureq->fd.stream.notify_state = ubus_lua_async_state_cb; + ustream_fd_init(&lureq->fd, fd); +} + +static int +ubus_lua_register_async( struct ubus_lua_request ** retlureq, struct ubus_context *ctx, lua_State *L, + int fnc ) +{ + struct ubus_lua_request *lureq; + + lureq = calloc( 1, sizeof( struct ubus_lua_request ) ); + if( !lureq ){ + lua_pushstring( L, "Out of memory" ); + return lua_error(L); + } + + lua_getglobal(L, "__ubus_cb_async"); + lua_pushvalue(L, fnc); + lureq->fnc = luaL_ref(L, -2); + lua_pop(L, 1); + + // remove the fnc + lua_pop(L, 1); + + *retlureq = lureq; + + return 0; +} + static int ubus_lua_call(lua_State *L) { @@ -669,6 +804,20 @@ ubus_lua_call(lua_State *L) const char *path = luaL_checkstring(L, 2); const char *func = luaL_checkstring(L, 3); + bool isAsync = lua_isfunction(L, 5); + struct ubus_lua_request * req = NULL; + + if (isAsync) { + int ret = ubus_lua_register_async(&req, c->ctx, L, lua_gettop(L)); + if (ret) { + return ret; + } + if (!req) { + lua_pushstring(L, "Failed to register async callback"); + return lua_error( L ); + } + } + luaL_checktype(L, 4, LUA_TTABLE); blob_buf_init(&c->buf, 0); @@ -689,7 +838,14 @@ ubus_lua_call(lua_State *L) } top = lua_gettop(L); - rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000); + + if (isAsync) { + rv = ubus_invoke_async(c->ctx, id, func, c->buf.head, &req->r); + req->r.fd_cb = ubus_lua_async_fd_cb; + req->r.complete_cb = ubus_lua_async_complete_cb; + } else { + rv = ubus_invoke(c->ctx, id, func, c->buf.head, ubus_lua_call_cb, L, c->timeout * 1000); + } if (rv != UBUS_STATUS_OK) { @@ -699,6 +855,10 @@ ubus_lua_call(lua_State *L) return 2; } + if (isAsync) { + ubus_complete_request_async(c->ctx, &req->r); + } + return lua_gettop(L) - top; } @@ -731,7 +891,7 @@ ubus_lua_load_event(lua_State *L) event->e.cb = ubus_event_handler; - /* update the he callback lookup table */ + /* update the callback lookup table */ lua_getglobal(L, "__ubus_cb_event"); lua_pushvalue(L, -2); event->r = luaL_ref(L, -2); @@ -1021,5 +1181,9 @@ luaopen_ubus(lua_State *L) /* create the publisher table - notifications of new subs */ lua_createtable(L, 1, 0); lua_setglobal(L, "__ubus_cb_publisher"); + + /* create the async table - callbacks for invoke_async */ + lua_createtable(L, 1, 0); + lua_setglobal(L, "__ubus_cb_async"); return 0; }