diff --git a/sample/testProducer.py b/sample/testProducer.py index 6938fd2..34023c6 100644 --- a/sample/testProducer.py +++ b/sample/testProducer.py @@ -1,37 +1,39 @@ -#/* -#* Licensed to the Apache Software Foundation (ASF) under one or more -#* contributor license agreements. See the NOTICE file distributed with -#* this work for additional information regarding copyright ownership. -#* The ASF licenses this file to You under the Apache License, Version 2.0 -#* (the "License"); you may not use this file except in compliance with -#* the License. You may obtain a copy of the License at -#* -#* http://www.apache.org/licenses/LICENSE-2.0 -#* -#* Unless required by applicable law or agreed to in writing, software -#* distributed under the License is distributed on an "AS IS" BASIS, -#* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -#* See the License for the specific language governing permissions and -#* limitations under the License. -#*/ +# /* +# * Licensed to the Apache Software Foundation (ASF) under one or more +# * contributor license agreements. See the NOTICE file distributed with +# * this work for additional information regarding copyright ownership. +# * The ASF licenses this file to You under the Apache License, Version 2.0 +# * (the "License"); you may not use this file except in compliance with +# * the License. You may obtain a copy of the License at +# * +# * http://www.apache.org/licenses/LICENSE-2.0 +# * +# * Unless required by applicable law or agreed to in writing, software +# * distributed under the License is distributed on an "AS IS" BASIS, +# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# * See the License for the specific language governing permissions and +# * limitations under the License. +# */ from base import * import time + def initProducer(name): print("---------Create Producer---------------") - producer =CreateProducer(name) - SetProducerNameServerAddress(producer,"172.17.0.2:9876") + producer = CreateProducer(name) + SetProducerNameServerAddress(producer, "172.17.0.2:9876") StartProducer(producer) return producer -def testSendMssage(producer,topic,key,body): + +def testSendMssage(producer, topic, key, body): print("Starting Sending.....") msg = CreateMessage(topic) SetMessageBody(msg, body) SetMessageKeys(msg, key) SetMessageTags(msg, "ThisMessageTag.") - result = SendMessageSync(producer,msg) + result = SendMessageSync(producer, msg) print(result) print("Msgid:") print(result.GetMsgId()) @@ -42,36 +44,57 @@ def testSendMssage(producer,topic,key,body): DestroyMessage(msg) print("Done...............") + def testSendMessageOneway(producer, topic, key, body): print("Starting Sending(Oneway).....") msg = CreateMessage(topic) SetMessageBody(msg, body) SetMessageKeys(msg, key) SetMessageTags(msg, "Send Message Oneway Test.") - SendMessageOneway(producer,msg) + SendMessageOneway(producer, msg) + DestroyMessage(msg) + print("Done...............") + + +def testSendMssageOrderly(producer, topic, key, body): + print("Starting Sending.....") + msg = CreateMessage(topic) + SetMessageBody(msg, body) + SetMessageKeys(msg, key) + SetMessageTags(msg, "ThisMessageTag.") + result = SendMessageOrderlyByShardingKey(producer, msg, "orderId") + print(result) + print("Msgid:") + print(result.GetMsgId()) + print("Offset:") + print(result.offset) + print("sendStatus:") + print(result.sendStatus) DestroyMessage(msg) print("Done...............") + def releaseProducer(producer): ShutdownProducer(producer) DestroyProducer(producer) print("--------Release producer-----------") + showClientVersion() producer = initProducer("TestPythonProducer") topic = "T_TestTopic" key = "TestKeys" body = "ThisIsTestBody" i = 0 -while i < 10000: +while i < 100: i += 1 - testSendMssage(producer,topic,key,body) - - print("Now Send Message:",i) + testSendMssageOrderly(producer, topic, key, body) + + print("Now Send Message:", i) while i < 10: i += 1 testSendMessageOneway(producer, topic, key, body) - print("Now Send Message One way:",i) + print("Now Send Message One way:", i) releaseProducer(producer) diff --git a/src/PythonWrapper.cpp b/src/PythonWrapper.cpp index f924e56..dec63ce 100644 --- a/src/PythonWrapper.cpp +++ b/src/PythonWrapper.cpp @@ -21,6 +21,7 @@ #include "CProducer.h" #include "CPushConsumer.h" #include "PythonWrapper.h" +#include "CMQException.h" #include #include @@ -115,6 +116,7 @@ const char *PyGetMessageId(PyMessageExt msgExt) { //producer void *PyCreateProducer(const char *groupId) { + PyEval_InitThreads(); // ensure create GIL, for call Python callback from C. return (void *) CreateProducer(groupId); } int PyDestroyProducer(void *producer) { @@ -124,6 +126,7 @@ int PyStartProducer(void *producer) { return StartProducer((CProducer *) producer); } int PyShutdownProducer(void *producer) { + PyThreadStateUnlock PyThreadUnlock; // Shutdown Producer is a block call, ensure thread don't hold GIL. return ShutdownProducer((CProducer *) producer); } int PySetProducerNameServerAddress(void *producer, const char *namesrv) { @@ -138,6 +141,14 @@ int PySetProducerInstanceName(void *producer, const char *instanceName) { int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel) { return SetProducerSessionCredentials((CProducer *)producer, accessKey, secretKey, channel); } +int PySetProducerCompressLevel(void *producer, int level) { + return SetProducerCompressLevel((CProducer *)producer, level); +} +int PySetProducerMaxMessageSize(void *producer, int size) { + return SetProducerMaxMessageSize((CProducer *)producer, size); +} + + PySendResult PySendMessageSync(void *producer, void *msg) { PySendResult ret; CSendResult result; @@ -153,6 +164,44 @@ int PySendMessageOneway(void *producer, void *msg) { return SendMessageOneway((CProducer *) producer, (CMessage *) msg); } +void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){ + PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback + PySendResult sendResult; + sendResult.sendStatus = result.sendStatus; + sendResult.offset = result.offset; + strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); + sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + PyCallback *callback = (PyCallback *)pyCallback; + boost::python::call(callback->successCallback, sendResult, (void *) msg); + delete pyCallback; +} + + +void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){ + PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback + PyMQException exception; + PyCallback *callback = (PyCallback *)pyCallback; + exception.error = e.error; + exception.line = e.line; + strncpy(exception.file, e.file, MAX_EXEPTION_FILE_LENGTH - 1); + exception.file[MAX_EXEPTION_FILE_LENGTH - 1] = 0; + strncpy(exception.msg, e.msg, MAX_EXEPTION_MSG_LENGTH - 1); + exception.msg[MAX_EXEPTION_MSG_LENGTH - 1] = 0; + strncpy(exception.type, e.type, MAX_EXEPTION_TYPE_LENGTH - 1); + exception.type[MAX_EXEPTION_TYPE_LENGTH - 1] = 0; + boost::python::call(callback->exceptionCallback, (void *) msg, exception); + delete pyCallback; +} + +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){ + PyCallback* pyCallback = new PyCallback(); + pyCallback->successCallback = sendSuccessCallback; + pyCallback->exceptionCallback = sendExceptionCallback; + return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, (void *)pyCallback); +} + + + PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) { PySendResult ret; CSendResult result; @@ -171,6 +220,17 @@ int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) { return index; } +PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) { + PySendResult ret; + CSendResult result; + SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result); + ret.sendStatus = result.sendStatus; + ret.offset = result.offset; + strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1); + ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0; + return ret; +} + //SendResult const char *PyGetSendResultMsgID(CSendResult &sendResult) { return (const char *) (sendResult.msgId); @@ -282,6 +342,13 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { .def("GetMsgId", &PySendResult::GetMsgId); class_("CMessageExt"); + class_("MQException") + .def_readonly("error", &PyMQException::error, "error") + .def_readonly("line", &PyMQException::line, "line") + .def("GetFile", &PyMQException::GetFile) + .def("GetMsg", &PyMQException::GetMsg) + .def("GetType", &PyMQException::GetType); + //For Message def("CreateMessage", PyCreateMessage, return_value_policy()); def("DestroyMessage", PyDestroyMessage); @@ -310,9 +377,15 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) { def("SetProducerNameServerDomain", PySetProducerNameServerDomain); def("SetProducerInstanceName", PySetProducerInstanceName); def("SetProducerSessionCredentials", PySetProducerSessionCredentials); + def("SetProducerCompressLevel", PySetProducerCompressLevel); + def("SetProducerMaxMessageSize", PySetProducerMaxMessageSize); + def("SendMessageSync", PySendMessageSync); + def("SendMessageAsync", PySendMessageAsync); + def("SendMessageOneway", PySendMessageOneway); def("SendMessageOrderly", PySendMessageOrderly); + def("SendMessageOrderlyByShardingKey", PySendMessageOrderlyByShardingKey); //For Consumer def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy()); diff --git a/src/PythonWrapper.h b/src/PythonWrapper.h index 987d839..c5bc5b9 100644 --- a/src/PythonWrapper.h +++ b/src/PythonWrapper.h @@ -22,6 +22,7 @@ #include "CProducer.h" #include "CPushConsumer.h" #include "CPullConsumer.h" +#include "CMQException.h" #include using namespace boost::python; @@ -36,6 +37,25 @@ typedef struct _PySendResult_ { } } PySendResult; +typedef struct _PyMQException_ { + int error; + int line; + char file[MAX_EXEPTION_FILE_LENGTH]; + char msg[MAX_EXEPTION_MSG_LENGTH]; + char type[MAX_EXEPTION_TYPE_LENGTH]; + + const char *GetFile() { + return (const char *) file; + } + const char *GetMsg() { + return (const char *) msg; + } + const char *GetType() { + return (const char *) type; + } +} PyMQException; + + typedef struct _PyMessageExt_ { CMessageExt *pMessageExt; } PyMessageExt; @@ -45,6 +65,11 @@ typedef struct _PyUserData_ { void *pData; } PyUserData; +typedef struct _PyCallback_ { + PyObject *successCallback; + PyObject *exceptionCallback; +} PyCallback; + #define PYTHON_CLIENT_VERSION "1.2.0" #define PYCLI_BUILD_DATE "04-12-2018" @@ -80,10 +105,20 @@ int PySetProducerNameServerAddress(void *producer, const char *namesrv); int PySetProducerNameServerDomain(void *producer, const char *domain); int PySetProducerInstanceName(void *producer, const char *instanceName); int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel); +int PySetProducerCompressLevel(void *producer, int level); +int PySetProducerMaxMessageSize(void *producer, int size); + PySendResult PySendMessageSync(void *producer, void *msg); int PySendMessageOneway(void *producer, void *msg); -// PySendResult PySendMessageOrderly(void *producer, void *msg , int autoRetryTimes, PyObject *args, PyObject *callback); + +void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback); +void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback); +int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback); + + PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector); +PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey); + int PyOrderlyCallbackInner(int size, CMessage *msg, void *args); //sendResult diff --git a/test/TestSendMessages.py b/test/TestSendMessages.py index 7258d70..142b6d8 100644 --- a/test/TestSendMessages.py +++ b/test/TestSendMessages.py @@ -214,8 +214,47 @@ def send_message_orderly(count): DestroyMessage(msg) print 'msg id =' + result.GetMsgId() +def send_message_orderly_with_shardingkey(count): + key = 'rmq-key' + print 'start sending sharding key order-ly message' + tag = 'test' + for n in range(count): + body = 'hi rmq sharding orderly-message, now is' + str(n) + msg = CreateMessage(topic_orderly) + SetMessageBody(msg, body) + SetMessageKeys(msg, key) + SetMessageTags(msg, tag) + + result = SendMessageOrderlyByShardingKey(producer, msg, 'orderId') + DestroyMessage(msg) + print 'msg id =' + result.GetMsgId() + def calc_which_queue_to_send(size, msg, arg): ## it is index start with 0.... return 0 - + +def send_message_async(count): + key = 'rmq-key' + print 'start sending message' + tag = 'test' + for n in range(count): + body = 'hi rmq message, now is' + str(n) + msg = CreateMessage(topic) + SetMessageBody(msg, body) + SetMessageKeys(msg, key) + SetMessageTags(msg, tag) + + SendMessageAsync(producer, msg, send_message_async_success, send_message_async_fail) + DestroyMessage(msg) + print 'send async message done' + time.sleep(10000) + +def send_message_async_success(result, msg): + print 'send success' + print 'msg id =' + result.GetMsgId() + +def send_message_async_fail(msg, exception): + print 'send message failed' + print 'error msg: ' + exception.GetMsg() + if __name__ == '__main__': send_message_orderly(10)