Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 49 additions & 26 deletions sample/testProducer.py
Original file line number Diff line number Diff line change
@@ -1,37 +1,39 @@
#/*
#* Licensed to the Apache Software Foundation (ASF) under one or more
#* contributor license agreements. See the NOTICE file distributed with
#* this work for additional information regarding copyright ownership.
#* The ASF licenses this file to You under the Apache License, Version 2.0
#* (the "License"); you may not use this file except in compliance with
#* the License. You may obtain a copy of the License at
#*
#* http://www.apache.org/licenses/LICENSE-2.0
#*
#* Unless required by applicable law or agreed to in writing, software
#* distributed under the License is distributed on an "AS IS" BASIS,
#* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#* See the License for the specific language governing permissions and
#* limitations under the License.
#*/
# /*
# * Licensed to the Apache Software Foundation (ASF) under one or more
# * contributor license agreements. See the NOTICE file distributed with
# * this work for additional information regarding copyright ownership.
# * The ASF licenses this file to You under the Apache License, Version 2.0
# * (the "License"); you may not use this file except in compliance with
# * the License. You may obtain a copy of the License at
# *
# * http://www.apache.org/licenses/LICENSE-2.0
# *
# * Unless required by applicable law or agreed to in writing, software
# * distributed under the License is distributed on an "AS IS" BASIS,
# * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# * See the License for the specific language governing permissions and
# * limitations under the License.
# */

from base import *
import time


def initProducer(name):
print("---------Create Producer---------------")
producer =CreateProducer(name)
SetProducerNameServerAddress(producer,"172.17.0.2:9876")
producer = CreateProducer(name)
SetProducerNameServerAddress(producer, "172.17.0.2:9876")
StartProducer(producer)
return producer

def testSendMssage(producer,topic,key,body):

def testSendMssage(producer, topic, key, body):
print("Starting Sending.....")
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, "ThisMessageTag.")
result = SendMessageSync(producer,msg)
result = SendMessageSync(producer, msg)
print(result)
print("Msgid:")
print(result.GetMsgId())
Expand All @@ -42,36 +44,57 @@ def testSendMssage(producer,topic,key,body):
DestroyMessage(msg)
print("Done...............")


def testSendMessageOneway(producer, topic, key, body):
print("Starting Sending(Oneway).....")
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, "Send Message Oneway Test.")
SendMessageOneway(producer,msg)
SendMessageOneway(producer, msg)
DestroyMessage(msg)
print("Done...............")


def testSendMssageOrderly(producer, topic, key, body):
print("Starting Sending.....")
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, "ThisMessageTag.")
result = SendMessageOrderlyByShardingKey(producer, msg, "orderId")
print(result)
print("Msgid:")
print(result.GetMsgId())
print("Offset:")
print(result.offset)
print("sendStatus:")
print(result.sendStatus)
DestroyMessage(msg)
print("Done...............")


def releaseProducer(producer):
ShutdownProducer(producer)
DestroyProducer(producer)
print("--------Release producer-----------")


showClientVersion()
producer = initProducer("TestPythonProducer")
topic = "T_TestTopic"
key = "TestKeys"
body = "ThisIsTestBody"
i = 0
while i < 10000:
while i < 100:
i += 1
testSendMssage(producer,topic,key,body)
print("Now Send Message:",i)
testSendMssageOrderly(producer, topic, key, body)

print("Now Send Message:", i)

while i < 10:
i += 1
testSendMessageOneway(producer, topic, key, body)
print("Now Send Message One way:",i)
print("Now Send Message One way:", i)

releaseProducer(producer)
73 changes: 73 additions & 0 deletions src/PythonWrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "CProducer.h"
#include "CPushConsumer.h"
#include "PythonWrapper.h"
#include "CMQException.h"
#include <boost/python.hpp>
#include <map>

Expand Down Expand Up @@ -115,6 +116,7 @@ const char *PyGetMessageId(PyMessageExt msgExt) {

//producer
void *PyCreateProducer(const char *groupId) {
PyEval_InitThreads(); // ensure create GIL, for call Python callback from C.
return (void *) CreateProducer(groupId);
}
int PyDestroyProducer(void *producer) {
Expand All @@ -124,6 +126,7 @@ int PyStartProducer(void *producer) {
return StartProducer((CProducer *) producer);
}
int PyShutdownProducer(void *producer) {
PyThreadStateUnlock PyThreadUnlock; // Shutdown Producer is a block call, ensure thread don't hold GIL.
return ShutdownProducer((CProducer *) producer);
}
int PySetProducerNameServerAddress(void *producer, const char *namesrv) {
Expand All @@ -138,6 +141,14 @@ int PySetProducerInstanceName(void *producer, const char *instanceName) {
int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel) {
return SetProducerSessionCredentials((CProducer *)producer, accessKey, secretKey, channel);
}
int PySetProducerCompressLevel(void *producer, int level) {
return SetProducerCompressLevel((CProducer *)producer, level);
}
int PySetProducerMaxMessageSize(void *producer, int size) {
return SetProducerMaxMessageSize((CProducer *)producer, size);
}


PySendResult PySendMessageSync(void *producer, void *msg) {
PySendResult ret;
CSendResult result;
Expand All @@ -153,6 +164,44 @@ int PySendMessageOneway(void *producer, void *msg) {
return SendMessageOneway((CProducer *) producer, (CMessage *) msg);
}

void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback){
PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
PySendResult sendResult;
sendResult.sendStatus = result.sendStatus;
sendResult.offset = result.offset;
strncpy(sendResult.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
sendResult.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
PyCallback *callback = (PyCallback *)pyCallback;
boost::python::call<void>(callback->successCallback, sendResult, (void *) msg);
delete pyCallback;
}


void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback){
PyThreadStateLock PyThreadLock; // ensure hold GIL, before call python callback
PyMQException exception;
PyCallback *callback = (PyCallback *)pyCallback;
exception.error = e.error;
exception.line = e.line;
strncpy(exception.file, e.file, MAX_EXEPTION_FILE_LENGTH - 1);
exception.file[MAX_EXEPTION_FILE_LENGTH - 1] = 0;
strncpy(exception.msg, e.msg, MAX_EXEPTION_MSG_LENGTH - 1);
exception.msg[MAX_EXEPTION_MSG_LENGTH - 1] = 0;
strncpy(exception.type, e.type, MAX_EXEPTION_TYPE_LENGTH - 1);
exception.type[MAX_EXEPTION_TYPE_LENGTH - 1] = 0;
boost::python::call<void>(callback->exceptionCallback, (void *) msg, exception);
delete pyCallback;
}

int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback){
PyCallback* pyCallback = new PyCallback();
pyCallback->successCallback = sendSuccessCallback;
pyCallback->exceptionCallback = sendExceptionCallback;
return SendAsync((CProducer *) producer, (CMessage *) msg, &PySendSuccessCallback, &PySendExceptionCallback, (void *)pyCallback);
}



PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector) {
PySendResult ret;
CSendResult result;
Expand All @@ -171,6 +220,17 @@ int PyOrderlyCallbackInner(int size, CMessage *msg, void *args) {
return index;
}

PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey) {
PySendResult ret;
CSendResult result;
SendMessageOrderlyByShardingKey((CProducer *) producer, (CMessage *) msg, shardingKey, &result);
ret.sendStatus = result.sendStatus;
ret.offset = result.offset;
strncpy(ret.msgId, result.msgId, MAX_MESSAGE_ID_LENGTH - 1);
ret.msgId[MAX_MESSAGE_ID_LENGTH - 1] = 0;
return ret;
}

//SendResult
const char *PyGetSendResultMsgID(CSendResult &sendResult) {
return (const char *) (sendResult.msgId);
Expand Down Expand Up @@ -282,6 +342,13 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) {
.def("GetMsgId", &PySendResult::GetMsgId);
class_<PyMessageExt>("CMessageExt");

class_<PyMQException>("MQException")
.def_readonly("error", &PyMQException::error, "error")
.def_readonly("line", &PyMQException::line, "line")
.def("GetFile", &PyMQException::GetFile)
.def("GetMsg", &PyMQException::GetMsg)
.def("GetType", &PyMQException::GetType);

//For Message
def("CreateMessage", PyCreateMessage, return_value_policy<return_opaque_pointer>());
def("DestroyMessage", PyDestroyMessage);
Expand Down Expand Up @@ -310,9 +377,15 @@ BOOST_PYTHON_MODULE (librocketmqclientpython) {
def("SetProducerNameServerDomain", PySetProducerNameServerDomain);
def("SetProducerInstanceName", PySetProducerInstanceName);
def("SetProducerSessionCredentials", PySetProducerSessionCredentials);
def("SetProducerCompressLevel", PySetProducerCompressLevel);
def("SetProducerMaxMessageSize", PySetProducerMaxMessageSize);

def("SendMessageSync", PySendMessageSync);
def("SendMessageAsync", PySendMessageAsync);

def("SendMessageOneway", PySendMessageOneway);
def("SendMessageOrderly", PySendMessageOrderly);
def("SendMessageOrderlyByShardingKey", PySendMessageOrderlyByShardingKey);

//For Consumer
def("CreatePushConsumer", PyCreatePushConsumer, return_value_policy<return_opaque_pointer>());
Expand Down
37 changes: 36 additions & 1 deletion src/PythonWrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
#include "CProducer.h"
#include "CPushConsumer.h"
#include "CPullConsumer.h"
#include "CMQException.h"
#include <boost/python.hpp>

using namespace boost::python;
Expand All @@ -36,6 +37,25 @@ typedef struct _PySendResult_ {
}
} PySendResult;

typedef struct _PyMQException_ {
int error;
int line;
char file[MAX_EXEPTION_FILE_LENGTH];
char msg[MAX_EXEPTION_MSG_LENGTH];
char type[MAX_EXEPTION_TYPE_LENGTH];

const char *GetFile() {
return (const char *) file;
}
const char *GetMsg() {
return (const char *) msg;
}
const char *GetType() {
return (const char *) type;
}
} PyMQException;


typedef struct _PyMessageExt_ {
CMessageExt *pMessageExt;
} PyMessageExt;
Expand All @@ -45,6 +65,11 @@ typedef struct _PyUserData_ {
void *pData;
} PyUserData;

typedef struct _PyCallback_ {
PyObject *successCallback;
PyObject *exceptionCallback;
} PyCallback;

#define PYTHON_CLIENT_VERSION "1.2.0"
#define PYCLI_BUILD_DATE "04-12-2018"

Expand Down Expand Up @@ -80,10 +105,20 @@ int PySetProducerNameServerAddress(void *producer, const char *namesrv);
int PySetProducerNameServerDomain(void *producer, const char *domain);
int PySetProducerInstanceName(void *producer, const char *instanceName);
int PySetProducerSessionCredentials(void *producer, const char *accessKey, const char *secretKey, const char *channel);
int PySetProducerCompressLevel(void *producer, int level);
int PySetProducerMaxMessageSize(void *producer, int size);

PySendResult PySendMessageSync(void *producer, void *msg);
int PySendMessageOneway(void *producer, void *msg);
// PySendResult PySendMessageOrderly(void *producer, void *msg , int autoRetryTimes, PyObject *args, PyObject *callback);

void PySendSuccessCallback(CSendResult result, CMessage *msg, void *pyCallback);
void PySendExceptionCallback(CMQException e, CMessage *msg, void *pyCallback);
int PySendMessageAsync(void *producer, void *msg, PyObject *sendSuccessCallback, PyObject *sendExceptionCallback);


PySendResult PySendMessageOrderly(void *producer, void *msg, int autoRetryTimes, void *args, PyObject *queueSelector);
PySendResult PySendMessageOrderlyByShardingKey(void *producer, void *msg, const char *shardingKey);

int PyOrderlyCallbackInner(int size, CMessage *msg, void *args);

//sendResult
Expand Down
41 changes: 40 additions & 1 deletion test/TestSendMessages.py
Original file line number Diff line number Diff line change
Expand Up @@ -214,8 +214,47 @@ def send_message_orderly(count):
DestroyMessage(msg)
print 'msg id =' + result.GetMsgId()

def send_message_orderly_with_shardingkey(count):
key = 'rmq-key'
print 'start sending sharding key order-ly message'
tag = 'test'
for n in range(count):
body = 'hi rmq sharding orderly-message, now is' + str(n)
msg = CreateMessage(topic_orderly)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)

result = SendMessageOrderlyByShardingKey(producer, msg, 'orderId')
DestroyMessage(msg)
print 'msg id =' + result.GetMsgId()

def calc_which_queue_to_send(size, msg, arg): ## it is index start with 0....
return 0


def send_message_async(count):
key = 'rmq-key'
print 'start sending message'
tag = 'test'
for n in range(count):
body = 'hi rmq message, now is' + str(n)
msg = CreateMessage(topic)
SetMessageBody(msg, body)
SetMessageKeys(msg, key)
SetMessageTags(msg, tag)

SendMessageAsync(producer, msg, send_message_async_success, send_message_async_fail)
DestroyMessage(msg)
print 'send async message done'
time.sleep(10000)

def send_message_async_success(result, msg):
print 'send success'
print 'msg id =' + result.GetMsgId()

def send_message_async_fail(msg, exception):
print 'send message failed'
print 'error msg: ' + exception.GetMsg()

if __name__ == '__main__':
send_message_orderly(10)