diff --git a/src/Qubic.vcxproj b/src/Qubic.vcxproj
index 2afd551db..1e766ae71 100644
--- a/src/Qubic.vcxproj
+++ b/src/Qubic.vcxproj
@@ -96,6 +96,7 @@
+
diff --git a/src/Qubic.vcxproj.filters b/src/Qubic.vcxproj.filters
index d76cdb246..2c3df626f 100644
--- a/src/Qubic.vcxproj.filters
+++ b/src/Qubic.vcxproj.filters
@@ -332,6 +332,9 @@
contract_core
+
+ oracle_interfaces
+
@@ -382,4 +385,4 @@
platform
-
+
\ No newline at end of file
diff --git a/src/contract_core/contract_def.h b/src/contract_core/contract_def.h
index e74d49cb1..cb4fbc182 100644
--- a/src/contract_core/contract_def.h
+++ b/src/contract_core/contract_def.h
@@ -441,3 +441,48 @@ static void initializeContracts()
#endif
}
+// Class for registering and looking up user procedures independently of input type, for example for notifications
+class UserProcedureRegistry
+{
+public:
+ struct UserProcedureData
+ {
+ USER_PROCEDURE procedure;
+ unsigned int contractIndex;
+ unsigned int localsSize;
+ unsigned short inputSize;
+ unsigned short outputSize;
+ };
+
+ void init()
+ {
+ setMemory(*this, 0);
+ }
+
+ bool add(unsigned int procedureId, const UserProcedureData& data)
+ {
+ const unsigned int cnt = (unsigned int)idToIndex.population();
+ if (cnt >= idToIndex.capacity())
+ return false;
+
+ copyMemory(userProcData[cnt], data);
+ idToIndex.set(procedureId, cnt);
+
+ return true;
+ }
+
+ const UserProcedureData* get(unsigned int procedureId) const
+ {
+ unsigned int idx;
+ if (!idToIndex.get(procedureId, idx))
+ return nullptr;
+ return userProcData + idx;
+ }
+
+protected:
+ UserProcedureData userProcData[MAX_CONTRACT_PROCEDURES_REGISTERED];
+ QPI::HashMap idToIndex;
+};
+
+// For registering and looking up user procedures independently of input type (for notifications), initialized by initContractExec()
+GLOBAL_VAR_DECL UserProcedureRegistry* userProcedureRegistry GLOBAL_VAR_INIT(nullptr);
diff --git a/src/contract_core/contract_exec.h b/src/contract_core/contract_exec.h
index 5d168e6ba..f6d52fd2b 100644
--- a/src/contract_core/contract_exec.h
+++ b/src/contract_core/contract_exec.h
@@ -192,6 +192,12 @@ static bool initContractExec()
if (!contractActionTracker.allocBuffer())
return false;
+ if (!allocPoolWithErrorLog(L"userProcedureRegistry", sizeof(*userProcedureRegistry), (void**)&userProcedureRegistry, __LINE__))
+ {
+ return false;
+ }
+ userProcedureRegistry->init();
+
return true;
}
@@ -218,6 +224,9 @@ static void deinitContractExec()
freePool(contractStateChangeFlags);
}
+ if (userProcedureRegistry)
+ freePool(userProcedureRegistry);
+
contractActionTracker.freeBuffer();
}
@@ -917,6 +926,16 @@ void QPI::QpiContextForInit::__registerUserProcedure(USER_PROCEDURE userProcedur
contractUserProcedureLocalsSizes[_currentContractIndex][inputType] = localsSize;
}
+void QPI::QpiContextForInit::__registerUserProcedureNotification(USER_PROCEDURE userProcedure, unsigned int procedureId, unsigned short inputSize, unsigned short outputSize, unsigned int localsSize) const
+{
+ ASSERT(userProcedureRegistry);
+ if (!userProcedureRegistry->add(procedureId, { userProcedure, _currentContractIndex, localsSize, inputSize, outputSize }))
+ {
+#if !defined(NDEBUG)
+ addDebugMessage(L"__registerUserProcedureNotification() failed. You should increase MAX_CONTRACT_PROCEDURES_REGISTERED.");
+#endif
+ }
+}
// QPI context used to call contract system procedure from qubic core (contract processor)
@@ -1266,15 +1285,6 @@ struct QpiContextUserFunctionCall : public QPI::QpiContextFunctionCall
};
-struct UserProcedureNotification
-{
- unsigned int contractIndex;
- USER_PROCEDURE procedure;
- const void* inputPtr;
- unsigned short inputSize;
- unsigned int localsSize;
-};
-
// QPI context used to call contract user procedure as a notification from qubic core (contract processor).
// This means, it isn't triggered by a transaction, but following an event after having setup the notification
// callback in the contract code.
@@ -1283,16 +1293,16 @@ struct UserProcedureNotification
// The procedure pointer, the expected inputSize, and the expected localsSize, which are passed via
// UserProcedureNotification, must be consistent. The code using notifications is responible for ensuring that.
// Use cases:
-// - oracle notifications (managed by oracleEngine)
+// - oracle notifications (managed by oracleEngine and userProcedureRegistry)
struct QpiContextUserProcedureNotificationCall : public QPI::QpiContextProcedureCall
{
- QpiContextUserProcedureNotificationCall(const UserProcedureNotification& notification) : QPI::QpiContextProcedureCall(notif.contractIndex, NULL_ID, 0, USER_PROCEDURE_NOTIFICATION_CALL), notif(notification)
+ QpiContextUserProcedureNotificationCall(const UserProcedureRegistry::UserProcedureData& notification) : QPI::QpiContextProcedureCall(notification.contractIndex, NULL_ID, 0, USER_PROCEDURE_NOTIFICATION_CALL), notif(notification)
{
contractActionTracker.init();
}
// Run user procedure notification
- void call()
+ void call(const void* inputPtr)
{
ASSERT(_currentContractIndex < contractCount);
@@ -1330,7 +1340,7 @@ struct QpiContextUserProcedureNotificationCall : public QPI::QpiContextProcedure
__qpiAbort(ContractErrorAllocInputOutputFailed);
}
char* locals = input + notif.inputSize;
- copyMem(input, notif.inputPtr, notif.inputSize);
+ copyMem(input, inputPtr, notif.inputSize);
setMem(locals, notif.localsSize, 0);
// call user procedure
@@ -1353,5 +1363,5 @@ struct QpiContextUserProcedureNotificationCall : public QPI::QpiContextProcedure
}
private:
- const UserProcedureNotification& notif;
+ const UserProcedureRegistry::UserProcedureData& notif;
};
diff --git a/src/contract_core/pre_qpi_def.h b/src/contract_core/pre_qpi_def.h
index 7a5aabd49..e1370a512 100644
--- a/src/contract_core/pre_qpi_def.h
+++ b/src/contract_core/pre_qpi_def.h
@@ -27,6 +27,9 @@ constexpr unsigned short MAX_NESTED_CONTRACT_CALLS = 10;
// Size of the contract action tracker, limits the number of transfers that one contract call can execute.
constexpr unsigned long long CONTRACT_ACTION_TRACKER_SIZE = 16 * 1024 * 1024;
+// Maximum number of contract procedures that may be registered, e.g. for user procedure notifications
+constexpr unsigned int MAX_CONTRACT_PROCEDURES_REGISTERED = 16 * 1024;
+
static void __beginFunctionOrProcedure(const unsigned int); // TODO: more human-readable form of function ID?
static void __endFunctionOrProcedure(const unsigned int);
diff --git a/src/contract_core/qpi_oracle_impl.h b/src/contract_core/qpi_oracle_impl.h
index 14a1af5d3..ac6b9fcaf 100644
--- a/src/contract_core/qpi_oracle_impl.h
+++ b/src/contract_core/qpi_oracle_impl.h
@@ -6,9 +6,10 @@
template
-QPI::sint64 QPI::QpiContextProcedureCall::queryOracle(
+QPI::sint64 QPI::QpiContextProcedureCall::__qpiQueryOracle(
const OracleInterface::OracleQuery& query,
- void (*notificationCallback)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, QPI::OracleNotificationInput& input, QPI::NoData& output, LocalsType& locals),
+ void (*notificationProcPtr)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, OracleNotificationInput& input, NoData& output, LocalsType& locals),
+ unsigned int notificationProcId,
uint32 timeoutMillisec
) const
{
@@ -28,9 +29,16 @@ QPI::sint64 QPI::QpiContextProcedureCall::queryOracle(
const QPI::uint16 contractIndex = static_cast(this->_currentContractIndex);
// check callback
- if (!notificationCallback || ContractStateType::__contract_index != contractIndex)
+ if (!notificationProcPtr || ContractStateType::__contract_index != contractIndex)
return -1;
+ // check vs registry of user procedures for notification
+ const UserProcedureRegistry::UserProcedureData* procData;
+ if (!userProcedureRegistry || !(procData = userProcedureRegistry->get(notificationProcId)) || procData->procedure != (USER_PROCEDURE)notificationProcPtr)
+ return -1;
+ ASSERT(procData->inputSize == sizeof(OracleNotificationInput));
+ ASSERT(procData->localsSize == sizeof(LocalsType));
+
// get and destroy fee (not adding to contracts execution fee reserve)
sint64 fee = OracleInterface::getQueryFee(query);
int contractSpectrumIdx = ::spectrumIndex(this->_currentContractId);
@@ -39,8 +47,7 @@ QPI::sint64 QPI::QpiContextProcedureCall::queryOracle(
// try to start query
QPI::sint64 queryId = oracleEngine.startContractQuery(
contractIndex, OracleInterface::oracleInterfaceIndex,
- &query, sizeof(query), timeoutMillisec,
- (USER_PROCEDURE)notificationCallback, sizeof(LocalsType));
+ &query, sizeof(query), timeoutMillisec, notificationProcId);
if (queryId >= 0)
{
// success
@@ -55,17 +62,18 @@ QPI::sint64 QPI::QpiContextProcedureCall::queryOracle(
input->queryId = -1;
QPI::NoData output;
auto* locals = (LocalsType*)__qpiAllocLocals(sizeof(LocalsType));
- notificationCallback(*this, *state, *input, output, *locals);
+ notificationProcPtr(*this, *state, *input, output, *locals);
__qpiFreeLocals();
__qpiFreeLocals();
return -1;
}
template
-inline QPI::sint32 QPI::QpiContextProcedureCall::subscribeOracle(
+inline QPI::sint32 QPI::QpiContextProcedureCall::__qpiSubscribeOracle(
const OracleInterface::OracleQuery& query,
- void (*notificationCallback)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, OracleNotificationInput& input, NoData& output, LocalsType& locals),
+ void (*notificationProcPtr)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, OracleNotificationInput& input, NoData& output, LocalsType& locals),
QPI::uint32 notificationIntervalInMilliseconds,
+ unsigned int notificationProcId,
bool notifyWithPreviousReply
) const
{
@@ -85,8 +93,7 @@ inline bool QPI::QpiContextProcedureCall::unsubscribeOracle(
template
bool QPI::QpiContextFunctionCall::getOracleQuery(QPI::sint64 queryId, OracleInterface::OracleQuery& query) const
{
- // TODO
- return false;
+ return oracleEngine.getOracleQuery(queryId, &query, sizeof(query));
}
template
diff --git a/src/contracts/TestExampleC.h b/src/contracts/TestExampleC.h
index a75f1e25d..33b4aa501 100644
--- a/src/contracts/TestExampleC.h
+++ b/src/contracts/TestExampleC.h
@@ -211,7 +211,7 @@ struct TESTEXC : public ContractBase
PUBLIC_PROCEDURE(QueryPriceOracle)
{
- output.oracleQueryId = qpi.queryOracle(input.priceOracleQuery, NotifyPriceOracleReply, input.timeoutMilliseconds);
+ output.oracleQueryId = QUERY_ORACLE(OI::Price, input.priceOracleQuery, NotifyPriceOracleReply, input.timeoutMilliseconds);
if (output.oracleQueryId < 0)
{
// error
@@ -234,7 +234,7 @@ struct TESTEXC : public ContractBase
PUBLIC_PROCEDURE(SubscribePriceOracle)
{
- output.oracleSubscriptionId = qpi.subscribeOracle(input.priceOracleQuery, NotifyPriceOracleReply, input.subscriptionIntervalMinutes);
+ output.oracleSubscriptionId = SUBSCRIBE_ORACLE(OI::Price, input.priceOracleQuery, NotifyPriceOracleReply, input.subscriptionIntervalMinutes, true);
if (output.oracleSubscriptionId < 0)
{
// error
@@ -271,18 +271,62 @@ struct TESTEXC : public ContractBase
}
}
+ typedef OracleNotificationInput NotifyMockOracleReply_input;
+ typedef NoData NotifyMockOracleReply_output;
+ struct NotifyMockOracleReply_locals
+ {
+ OI::Mock::OracleQuery query;
+ uint32 queryExtraData;
+ };
+
+ PRIVATE_PROCEDURE_WITH_LOCALS(NotifyMockOracleReply)
+ {
+ if (input.status == ORACLE_QUERY_STATUS_SUCCESS)
+ {
+ // get and use query info if needed
+ if (!qpi.getOracleQuery(input.queryId, locals.query))
+ return;
+
+ ASSERT(locals.query.value == input.reply.echoedValue);
+ ASSERT(locals.query.value == input.reply.doubledValue / 2);
+
+ // TODO: log
+ }
+ else
+ {
+ // handle failure ...
+
+ // TODO: log
+ }
+ }
+
struct END_TICK_locals
{
OI::Price::OracleQuery priceOracleQuery;
+ OI::Mock::OracleQuery mockOracleQuery;
sint64 oracleQueryId;
};
END_TICK_WITH_LOCALS()
{
- // Query oracle
- if (qpi.tick() % 2 == 0)
+ // Query oracles
+ if (qpi.tick() % 10 == 0)
{
- locals.oracleQueryId = qpi.queryOracle(locals.priceOracleQuery, NotifyPriceOracleReply, 20000);
+ // Setup query (in extra scope limit scope of using namespace Ch
+ {
+ using namespace Ch;
+ locals.priceOracleQuery.oracle = OI::Price::getMockOracleId();
+ locals.priceOracleQuery.currency1 = id(B, T, C, null, null);
+ locals.priceOracleQuery.currency2 = id(U, S, D, null, null);
+ locals.priceOracleQuery.timestamp = qpi.now();
+ }
+
+ locals.oracleQueryId = QUERY_ORACLE(OI::Price, locals.priceOracleQuery, NotifyPriceOracleReply, 20000);
+ }
+ if (qpi.tick() % 2 == 1)
+ {
+ locals.mockOracleQuery.value = qpi.tick();
+ QUERY_ORACLE(OI::Mock, locals.mockOracleQuery, NotifyMockOracleReply, 8000);
}
}
@@ -300,5 +344,8 @@ struct TESTEXC : public ContractBase
REGISTER_USER_PROCEDURE(QpiBidInIpo, 30);
REGISTER_USER_PROCEDURE(QueryPriceOracle, 100);
+
+ REGISTER_USER_PROCEDURE_NOTIFICATION(NotifyPriceOracleReply);
+ REGISTER_USER_PROCEDURE_NOTIFICATION(NotifyMockOracleReply);
}
};
diff --git a/src/contracts/qpi.h b/src/contracts/qpi.h
index 1fef17fbd..98a94d4ce 100644
--- a/src/contracts/qpi.h
+++ b/src/contracts/qpi.h
@@ -70,6 +70,22 @@ namespace QPI
constexpr sint64 INVALID_AMOUNT = 0x8000000000000000;
+ // Characters for building strings (for example in constructor of id / m256i)
+ namespace Ch
+ {
+ enum : char
+ {
+ null = 0,
+ space = ' ', slash = '/', backslash = '\\', dot = '.', comma = ',', colon = ':', semicolon = ';',
+ a = 'a', b = 'b', c = 'c', d = 'd', e = 'e', f = 'f', g = 'g', h = 'h', i = 'i', j = 'j', k = 'k', l = 'l', m = 'm',
+ n = 'n', o = 'o', p = 'p', q = 'q', r = 'r', s = 's', t = 't', u = 'u', v = 'v', w = 'w', x = 'x', y = 'y', z = 'z',
+ A = 'A', B = 'B', C = 'C', D = 'D', E = 'E', F = 'F', G = 'G', H = 'H', I = 'I', J = 'J', K = 'K', L = 'L', M = 'M',
+ N = 'N', O = 'O', P = 'P', Q = 'Q', R = 'R', S = 'S', T = 'T', U = 'U', V = 'V', W = 'W', X = 'X', Y = 'Y', Z = 'Z',
+ _0 = '0', _1 = '1', _2 = '2', _3 = '3', _4 = '4', _5 = '5', _6 = '6', _7 = '7', _8 = '8', _9 = '9',
+ };
+ }
+
+ // Letters for defining identity with ID function
constexpr long long _A = 0;
constexpr long long _B = 1;
constexpr long long _C = 2;
@@ -2472,7 +2488,7 @@ namespace QPI
{
sint64 queryId; ///< ID of the oracle query that led to this notification.
uint32 subscriptionId; ///< ID of the oracle subscription or 0 in case of a pure oracle query.
- uint8 status; ///< Oracle query status as defined in `network_messages/common_def.h`
+ uint32 status; ///< Oracle query status as defined in `network_messages/common_def.h`
typename OracleInterface::OracleReply reply; ///< Oracle reply if status == ORACLE_QUERY_STATUS_SUCCESS
};
@@ -2519,32 +2535,6 @@ namespace QPI
uint32 quantity
) const;
- /**
- * @brief Initiate oracle query that will lead to notification later.
- * @param query Details about which oracle to query for which information, as defined by a specific oracle interface.
- * @param notificationCallback User procedure that shall be executed when the oracle reply is available or an error occurs.
- * @param timeoutMillisec Maximum number of milliseconds to wait for reply.
- * @return Oracle query ID that can be used to get the status of the query, or 0 on error.
- *
- * This will automatically burn the oracle query fee as defined by the oracle interface (burning without
- * adding to the contract's execution fee reserve). It will fail if the contract doesn't have enough QU.
- *
- * The notification callback will be executed when the reply is available or on error.
- * The callback must be a user procedure of the contract calling qpi.queryOracle() with the procedure input type
- * OracleNotificationInput and NoData as output.
- * Success is indicated by input.status == ORACLE_QUERY_STATUS_SUCCESS.
- * If an error happened before the query has been created and sent, input.status is ORACLE_QUERY_STATUS_UNKNOWN
- * and input.queryID is -1 (invalid).
- * Other errors that may happen with valid input.queryID are input.status == ORACLE_QUERY_STATUS_TIMEOUT and
- * input.status == ORACLE_QUERY_STATUS_UNRESOLVABLE.
- */
- template
- inline sint64 queryOracle(
- const OracleInterface::OracleQuery& query,
- void (*notificationCallback)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, OracleNotificationInput& input, NoData& output, LocalsType& locals),
- uint32 timeoutMillisec = 60000
- ) const;
-
inline sint64 releaseShares(
const Asset& asset,
const id& owner,
@@ -2569,43 +2559,6 @@ namespace QPI
sint64 invocationReward
) const;
- /**
- * @brief Subscribe for regularly querying an oracle.
- * @param query The regular query, which must have a member `DateAndTime timestamp`.
- * @param notificationCallback User procedure that shall be executed when the oracle reply is available or an error occurs.
- * @param notificationIntervalInMilliseconds Number of milliseconds between consecutive queries/replies.
- * This is also used as a timeout. Currently, only multiples of 60000 are supported and other
- * values are rejected with an error.
- * @param notifyWithPreviousReply Whether to immediately notify this contract with the most up-to-date value if any is available.
- * @return Oracle subscription ID that can be used to get the status of the subscription, or -1 on error.
- *
- * Subscriptions automatically expire at the end of each epoch. So, a common pattern is to call qpi.subscribeOracle()
- * in BEGIN_EPOCH.
- *
- * Subscriptions facilitate shareing common oracle queries among multiple contracts. This saves network ressources and allows
- * to provide a fixed-price subscription for the whole epoch, which is usually much cheaper than the equivalent series of
- * individual qpi.queryOracle() calls.
- *
- * The qpi.subscribeOracle() call will automatically burn the oracle subscription fee as defined by the oracle interface
- * (burning without adding to the contract's execution fee reserve). It will fail if the contract doesn't have enough QU.
- *
- * The notification callback will be executed when the reply is available or on error.
- * The callback must be a user procedure of the contract calling qpi.subscribeOracle() with the procedure input type
- * OracleNotificationInput and NoData as output.
- * Success is indicated by input.status == ORACLE_QUERY_STATUS_SUCCESS.
- * If an error happened before the query has been created and sent, input.status is ORACLE_QUERY_STATUS_UNKNOWN
- * and input.queryID is -1 (invalid).
- * Other errors that may happen with valid input.queryID are input.status == ORACLE_QUERY_STATUS_TIMEOUT and
- * input.status == ORACLE_QUERY_STATUS_UNRESOLVABLE.
- */
- template
- inline sint32 subscribeOracle(
- const OracleInterface::OracleQuery& query,
- void (*notificationCallback)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, OracleNotificationInput& input, NoData& output, LocalsType& locals),
- uint32 notificationIntervalInMilliseconds = 60000,
- bool notifyWithPreviousReply = true
- ) const;
-
/**
* @brief Add/change/cancel shareholder vote(s) in another contract.
* @param contractIndex Index of the other contract, that SELF is shareholder of and that the proposal is about.
@@ -2654,6 +2607,25 @@ namespace QPI
bool __qpiCallSystemProc(unsigned int otherContractIndex, InputType& input, OutputType& output, sint64 invocationReward) const;
inline void __qpiNotifyPostIncomingTransfer(const id& source, const id& dest, sint64 amount, uint8 type) const;
+ // Internal version of QUERY_ORACLE (macro ensures that proc pointer and id match)
+ template
+ inline sint64 __qpiQueryOracle(
+ const OracleInterface::OracleQuery& query,
+ void (*notificationProcPtr)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, OracleNotificationInput& input, NoData& output, LocalsType& locals),
+ unsigned int notificationProcId,
+ uint32 timeoutMillisec
+ ) const;
+
+ // Internal version of SUBSCRIBE_ORACLE (macro ensures that proc pointer and id match)
+ template
+ inline sint32 __qpiSubscribeOracle(
+ const OracleInterface::OracleQuery& query,
+ void (*notificationProcPtr)(const QPI::QpiContextProcedureCall& qpi, ContractStateType& state, OracleNotificationInput& input, NoData& output, LocalsType& locals),
+ unsigned int notificationProcId,
+ uint32 notificationIntervalInMilliseconds = 60000,
+ bool notifyWithPreviousReply = true
+ ) const;
+
// Internal version of transfer() that takes the TransferType as additional argument.
inline sint64 __transfer( // Attempts to transfer energy from this qubic
const id& destination, // Destination to transfer to, use NULL_ID to destroy the transferred energy
@@ -2671,6 +2643,7 @@ namespace QPI
{
inline void __registerUserFunction(USER_FUNCTION, unsigned short, unsigned short, unsigned short, unsigned int) const;
inline void __registerUserProcedure(USER_PROCEDURE, unsigned short, unsigned short, unsigned short, unsigned int) const;
+ inline void __registerUserProcedureNotification(USER_PROCEDURE, unsigned int, unsigned short, unsigned short, unsigned int) const;
// Construction is done in core, not allowed in contracts
inline QpiContextForInit(unsigned int contractIndex);
@@ -2941,7 +2914,7 @@ namespace QPI
#define PRIVATE_PROCEDURE_WITH_LOCALS(procedure) \
private: \
- enum { __is_function_##procedure = false }; \
+ enum { __is_function_##procedure = false, __id_##procedure = (CONTRACT_INDEX << 22) | __LINE__ }; \
inline static void procedure(const QPI::QpiContextProcedureCall& qpi, CONTRACT_STATE_TYPE& state, procedure##_input& input, procedure##_output& output, procedure##_locals& locals) { ::__FunctionOrProcedureBeginEndGuard<(CONTRACT_INDEX << 22) | __LINE__> __prologueEpilogueCaller; __impl_##procedure(qpi, state, input, output, locals); } \
static void __impl_##procedure(const QPI::QpiContextProcedureCall& qpi, CONTRACT_STATE_TYPE& state, procedure##_input& input, procedure##_output& output, procedure##_locals& locals)
@@ -2963,7 +2936,7 @@ namespace QPI
#define PUBLIC_PROCEDURE_WITH_LOCALS(procedure) \
public: \
- enum { __is_function_##procedure = false }; \
+ enum { __is_function_##procedure = false, __id_##procedure = (CONTRACT_INDEX << 22) | __LINE__ }; \
inline static void procedure(const QPI::QpiContextProcedureCall& qpi, CONTRACT_STATE_TYPE& state, procedure##_input& input, procedure##_output& output, procedure##_locals& locals) { ::__FunctionOrProcedureBeginEndGuard<(CONTRACT_INDEX << 22) | __LINE__> __prologueEpilogueCaller; __impl_##procedure(qpi, state, input, output, locals); } \
static void __impl_##procedure(const QPI::QpiContextProcedureCall& qpi, CONTRACT_STATE_TYPE& state, procedure##_input& input, procedure##_output& output, procedure##_locals& locals)
@@ -2989,6 +2962,14 @@ namespace QPI
static_assert(sizeof(userProcedure##_locals) <= MAX_SIZE_OF_CONTRACT_LOCALS, #userProcedure "_locals size too large"); \
qpi.__registerUserProcedure((USER_PROCEDURE)userProcedure, inputType, sizeof(userProcedure##_input), sizeof(userProcedure##_output), sizeof(userProcedure##_locals));
+ // Register procedure for notifications (such as oracle reply notification)
+ #define REGISTER_USER_PROCEDURE_NOTIFICATION(userProcedure) \
+ static_assert(!__is_function_##userProcedure, #userProcedure " is function"); \
+ static_assert(sizeof(userProcedure##_output) <= 65535, #userProcedure "_output size too large"); \
+ static_assert(sizeof(userProcedure##_input) <= 65535, #userProcedure "_input size too large"); \
+ static_assert(sizeof(userProcedure##_locals) <= MAX_SIZE_OF_CONTRACT_LOCALS, #userProcedure "_locals size too large"); \
+ qpi.__registerUserProcedureNotification((USER_PROCEDURE)userProcedure, __id_##userProcedure, sizeof(userProcedure##_input), sizeof(userProcedure##_output), sizeof(userProcedure##_locals));
+
// Call function or procedure of current contract (without changing invocation reward)
// WARNING: input may be changed by called function
#define CALL(functionOrProcedure, input, output) \
@@ -3052,7 +3033,59 @@ namespace QPI
#define INVOKE_OTHER_CONTRACT_PROCEDURE(contractStateType, procedure, input, output, invocationReward) \
INVOKE_OTHER_CONTRACT_PROCEDURE_E(contractStateType, procedure, input, output, invocationReward, interContractCallError)
- #define QUERY_ORACLE(oracle, query) // TODO
+ /**
+ * @brief Initiate oracle query that will lead to notification later.
+ * @param query Details about which oracle to query for which information, as defined by a specific oracle interface.
+ * @param userProcNotification User procedure that shall be executed when the oracle reply is available or an error occurs.
+ * @param timeoutMillisec Maximum number of milliseconds to wait for reply.
+ * @return Oracle query ID that can be used to get the status of the query, or 0 on error.
+ *
+ * This will automatically burn the oracle query fee as defined by the oracle interface (burning without
+ * adding to the contract's execution fee reserve). It will fail if the contract doesn't have enough QU.
+ *
+ * The notification callback will be executed when the reply is available or on error.
+ * The callback must be a user procedure of the contract calling qpi.queryOracle() with the procedure input type
+ * OracleNotificationInput and NoData as output. The procedure must be registered with
+ * REGISTER_USER_PROCEDURE_NOTIFICATION() in REGISTER_USER_FUNCTIONS_AND_PROCEDURES().
+ * Success is indicated by input.status == ORACLE_QUERY_STATUS_SUCCESS.
+ * If an error happened before the query has been created and sent, input.status is ORACLE_QUERY_STATUS_UNKNOWN
+ * and input.queryID is -1 (invalid).
+ * Other errors that may happen with valid input.queryID are input.status == ORACLE_QUERY_STATUS_TIMEOUT and
+ * input.status == ORACLE_QUERY_STATUS_UNRESOLVABLE.
+ */
+ #define QUERY_ORACLE(OracleInterface, query, userProcNotification, timeoutMillisec) qpi.__qpiQueryOracle(query, userProcNotification, __id_##userProcNotification, timeoutMillisec)
+
+ /**
+ * @brief Subscribe for regularly querying an oracle.
+ * @param query The regular query, which must have a member `DateAndTime timestamp`.
+ * @param notificationCallback User procedure that shall be executed when the oracle reply is available or an error occurs.
+ * @param notificationIntervalInMilliseconds Number of milliseconds between consecutive queries/replies.
+ * This is also used as a timeout. Currently, only multiples of 60000 are supported and other
+ * values are rejected with an error.
+ * @param notifyWithPreviousReply Whether to immediately notify this contract with the most up-to-date value if any is available.
+ * @return Oracle subscription ID that can be used to get the status of the subscription, or -1 on error.
+ *
+ * Subscriptions automatically expire at the end of each epoch. So, a common pattern is to call qpi.subscribeOracle()
+ * in BEGIN_EPOCH.
+ *
+ * Subscriptions facilitate shareing common oracle queries among multiple contracts. This saves network ressources and allows
+ * to provide a fixed-price subscription for the whole epoch, which is usually much cheaper than the equivalent series of
+ * individual qpi.queryOracle() calls.
+ *
+ * The qpi.subscribeOracle() call will automatically burn the oracle subscription fee as defined by the oracle interface
+ * (burning without adding to the contract's execution fee reserve). It will fail if the contract doesn't have enough QU.
+ *
+ * The notification callback will be executed when the reply is available or on error.
+ * The callback must be a user procedure of the contract calling qpi.subscribeOracle() with the procedure input type
+ * OracleNotificationInput and NoData as output. The procedure must be registered with
+ * REGISTER_USER_PROCEDURE_NOTIFICATION() in REGISTER_USER_FUNCTIONS_AND_PROCEDURES().
+ * Success is indicated by input.status == ORACLE_QUERY_STATUS_SUCCESS.
+ * If an error happened before the query has been created and sent, input.status is ORACLE_QUERY_STATUS_UNKNOWN
+ * and input.queryID is -1 (invalid).
+ * Other errors that may happen with valid input.queryID are input.status == ORACLE_QUERY_STATUS_TIMEOUT and
+ * input.status == ORACLE_QUERY_STATUS_UNRESOLVABLE.
+ */
+ #define SUBSCRIBE_ORACLE(OracleInterface, query, userProcNotification, notificationIntervalInMilliseconds, notifyWithPreviousReply) qpi.__qpiSubscribeOracle(query, userProcNotification, __id_##userProcNotification, notificationIntervalInMilliseconds, notifyWithPreviousReply)
#define SELF id(CONTRACT_INDEX, 0, 0, 0)
diff --git a/src/logging/logging.h b/src/logging/logging.h
index c0ed6f6d7..d22512e8b 100644
--- a/src/logging/logging.h
+++ b/src/logging/logging.h
@@ -374,6 +374,7 @@ class qLogger
static constexpr unsigned int SC_BEGIN_TICK_TX = NUMBER_OF_TRANSACTIONS_PER_TICK + 2;
static constexpr unsigned int SC_END_TICK_TX = NUMBER_OF_TRANSACTIONS_PER_TICK + 3;
static constexpr unsigned int SC_END_EPOCH_TX = NUMBER_OF_TRANSACTIONS_PER_TICK + 4;
+ static constexpr unsigned int SC_NOTIFICATION_TX = NUMBER_OF_TRANSACTIONS_PER_TICK + 5;
#if ENABLED_LOGGING
// Struct to map log buffer from log id
diff --git a/src/network_messages/common_def.h b/src/network_messages/common_def.h
index e7368043e..e8580e0e7 100644
--- a/src/network_messages/common_def.h
+++ b/src/network_messages/common_def.h
@@ -47,8 +47,8 @@ typedef union m256i
#endif
-constexpr uint16_t MAX_ORACLE_QUERY_SIZE = MAX_INPUT_SIZE - 8;
-constexpr uint16_t MAX_ORACLE_REPLY_SIZE = MAX_INPUT_SIZE - 8;
+constexpr uint16_t MAX_ORACLE_QUERY_SIZE = MAX_INPUT_SIZE - 16;
+constexpr uint16_t MAX_ORACLE_REPLY_SIZE = MAX_INPUT_SIZE - 16;
constexpr uint8_t ORACLE_QUERY_TYPE_CONTRACT_QUERY = 0;
constexpr uint8_t ORACLE_QUERY_TYPE_CONTRACT_SUBSCRIPTION = 1;
@@ -72,8 +72,9 @@ constexpr uint16_t ORACLE_FLAG_OM_ERROR_FLAGS = 0xff; ///< Mask of all error fl
constexpr uint16_t ORACLE_FLAG_REPLY_RECEIVED = 0x100; ///< Oracle engine got valid reply from the oracle machine.
constexpr uint16_t ORACLE_FLAG_BAD_SIZE_REPLY = 0x200; ///< Oracle engine got reply of wrong size from the oracle machine.
constexpr uint16_t ORACLE_FLAG_OM_DISAGREE = 0x400; ///< Oracle engine got different replies from oracle machines.
-constexpr uint16_t ORACLE_FLAG_COMP_DISAGREE = 0x800; ///< The number of reply commits is sufficient (>= 451 computors), but they disagree about the reply value.
-constexpr uint16_t ORACLE_FLAG_TIMEOUT = 0x1000; ///< The weren't enough reply commit tx before timeout (< 451).
+constexpr uint16_t ORACLE_FLAG_COMP_DISAGREE = 0x800; ///< The reply commits differ too much and no quorum can be reached.
+constexpr uint16_t ORACLE_FLAG_TIMEOUT = 0x1000; ///< The weren't enough reply commit tx with the same digest before timeout (< 451).
+constexpr uint16_t ORACLE_FLAG_BAD_SIZE_REVEAL = 0x200; ///< Reply in a reveal tx had wrong size.
typedef union IPv4Address
{
diff --git a/src/network_messages/oracles.h b/src/network_messages/oracles.h
index bb7861375..1ff3f4407 100644
--- a/src/network_messages/oracles.h
+++ b/src/network_messages/oracles.h
@@ -31,7 +31,7 @@ struct RequestOracleData
unsigned int _padding;
// tick, query ID, or subscription ID (depending on reqType)
- unsigned long long reqTickOrId;
+ long long reqTickOrId;
};
static_assert(sizeof(RequestOracleData) == 16, "Something is wrong with the struct size.");
@@ -79,23 +79,25 @@ struct RespondOracleDataQueryMetadata
uint8_t type; ///< contract query, user query, subscription (may be by multiple contracts)
uint8_t status; ///< overall status (pending -> success or timeout)
uint16_t statusFlags; ///< status and error flags (especially as returned by oracle machine connected to this node)
- uint32_t interfaceIndex;
uint32_t queryTick;
- uint64_t timeout; ///< Timeout in QPI::DateAndTime format
m256i queryingEntity;
+ uint64_t timeout; ///< Timeout in QPI::DateAndTime format
+ uint32_t interfaceIndex;
int32_t subscriptionId; ///< -1 is reserved for "no subscription"
uint32_t revealTick; ///< Tick of reveal tx. Only available if status is success.
uint16_t totalCommits; ///< Total number of commit tx. Only available if status isn't success.
uint16_t agreeingCommits; ///< Number of agreeing commit tx (biggest group with same digest). Only available if status isn't success.
};
+static_assert(sizeof(RespondOracleDataQueryMetadata) == 72, "Unexpected struct size");
+
struct RespondOracleDataSubscriptionMetadata
{
- uint16_t queryIntervalMinutes;
- uint16_t queryTimestampOffset;
int64_t lastQueryQueryId;
int64_t lastRevealQueryId;
uint64_t nextQueryTimestamp;
+ uint16_t queryIntervalMinutes;
+ uint16_t queryTimestampOffset;
};
struct RespondOracleDataSubscriptionContractMetadata
diff --git a/src/oracle_core/net_msg_impl.h b/src/oracle_core/net_msg_impl.h
new file mode 100644
index 000000000..03b7ea76b
--- /dev/null
+++ b/src/oracle_core/net_msg_impl.h
@@ -0,0 +1,152 @@
+#pragma once
+
+#include "oracle_core/oracle_engine.h"
+#include "network_messages/oracles.h"
+#include "network_core/peers.h"
+
+template
+void OracleEngine::processRequestOracleData(Peer* peer, RequestResponseHeader* header) const
+{
+ // check input
+ ASSERT(header && peer);
+ ASSERT(header->type() == RequestOracleData::type());
+ if (!header->checkPayloadSize(sizeof(RequestOracleData)))
+ return;
+
+ // prepare buffer
+ constexpr int maxQueryIdCount = 2;
+ constexpr int payloadBufferSize = math_lib::max((int)math_lib::max(MAX_ORACLE_QUERY_SIZE, MAX_ORACLE_REPLY_SIZE), maxQueryIdCount * 8);
+ static_assert(payloadBufferSize >= sizeof(RespondOracleDataQueryMetadata), "Buffer too small.");
+ static_assert(payloadBufferSize < 32 * 1024, "Large alloc in stack may need reconsideration.");
+ uint8_t responseBuffer[sizeof(RespondOracleData) + payloadBufferSize];
+ RespondOracleData* response = (RespondOracleData*)responseBuffer;
+ void* payload = responseBuffer + sizeof(RespondOracleData);
+ int64_t* payloadQueryIds = (int64_t*)(responseBuffer + sizeof(RespondOracleData));
+
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
+ // process request
+ const RequestOracleData* request = header->getPayload();
+ switch (request->reqType)
+ {
+
+ case RequestOracleData::requestAllQueryIdsByTick:
+ {
+ // TODO
+ break;
+ }
+
+ case RequestOracleData::requestUserQueryIdsByTick:
+ // TODO
+ break;
+
+ case RequestOracleData::requestContractDirectQueryIdsByTick:
+ // TODO
+ break;
+
+ case RequestOracleData::requestContractSubscriptionQueryIdsByTick:
+ // TODO
+ break;
+
+ case RequestOracleData::requestPendingQueryIds:
+ {
+ response->resType = RespondOracleData::respondQueryIds;
+ const unsigned int numMessages = (pendingQueryIndices.numValues + maxQueryIdCount - 1) / maxQueryIdCount;
+ unsigned int idIdx = 0;
+ for (unsigned int msgIdx = 0; msgIdx < numMessages; ++msgIdx)
+ {
+ unsigned int idxInMsg = 0;
+ for (; idxInMsg < maxQueryIdCount && idIdx < pendingQueryIndices.numValues; ++idxInMsg)
+ {
+ payloadQueryIds[idxInMsg] = pendingQueryIndices.values[idIdx];
+ }
+ enqueueResponse(peer, sizeof(RespondOracleData) + idxInMsg * 8,
+ RespondOracleData::type(), header->dejavu(), response);
+ }
+ break;
+ }
+
+ case RequestOracleData::requestQueryAndResponse:
+ {
+ // get query metadata
+ const int64_t queryId = request->reqTickOrId;
+ uint32_t queryIndex;
+ if (queryId < 0 || !queryIdToIndex->get(queryId, queryIndex))
+ break;
+ const OracleQueryMetadata& oqm = queries[queryIndex];
+
+ // prepare metadata response
+ response->resType = RespondOracleData::respondQueryMetadata;
+ auto* payloadOqm = (RespondOracleDataQueryMetadata*)payload;
+ setMemory(*payloadOqm, 0);
+ payloadOqm->queryId = oqm.queryId;
+ payloadOqm->type = oqm.type;
+ payloadOqm->status = oqm.status;
+ payloadOqm->statusFlags = oqm.statusFlags;
+ payloadOqm->queryTick = oqm.queryTick;
+ if (oqm.type == ORACLE_QUERY_TYPE_CONTRACT_QUERY)
+ {
+ payloadOqm->queryingEntity = m256i(oqm.typeVar.contract.queryingContract, 0, 0, 0);
+ }
+ else if (oqm.type == ORACLE_QUERY_TYPE_USER_QUERY)
+ {
+ payloadOqm->queryingEntity = oqm.typeVar.user.queryingEntity;
+ }
+ payloadOqm->timeout = *(uint64_t*)&oqm.timeout;
+ payloadOqm->interfaceIndex = oqm.interfaceIndex;
+ if (oqm.type == ORACLE_QUERY_TYPE_CONTRACT_SUBSCRIPTION)
+ {
+ payloadOqm->subscriptionId = oqm.typeVar.subscription.subscriptionId;
+ }
+ if (oqm.status == ORACLE_QUERY_STATUS_SUCCESS)
+ {
+ payloadOqm->revealTick = oqm.statusVar.success.revealTick;
+ }
+ if (oqm.status == ORACLE_QUERY_STATUS_PENDING || oqm.status == ORACLE_QUERY_STATUS_COMMITTED)
+ {
+ const ReplyState& replyState = replyStates[oqm.statusVar.pending.replyStateIndex];
+ payloadOqm->agreeingCommits = replyState.replyCommitHistogramCount[replyState.mostCommitsHistIdx];
+ payloadOqm->totalCommits = replyState.totalCommits;
+ }
+ else if (oqm.status == ORACLE_QUERY_STATUS_TIMEOUT || oqm.status == ORACLE_QUERY_STATUS_UNRESOLVABLE)
+ {
+ payloadOqm->agreeingCommits = oqm.statusVar.failure.agreeingCommits;
+ payloadOqm->totalCommits = oqm.statusVar.failure.totalCommits;
+ }
+
+ // send metadata response
+ enqueueResponse(peer, sizeof(RespondOracleData) + sizeof(RespondOracleDataQueryMetadata),
+ RespondOracleData::type(), header->dejavu(), response);
+
+ // get and send query data
+ const uint16_t querySize = (uint16_t)OI::oracleInterfaces[oqm.interfaceIndex].querySize;
+ ASSERT(querySize <= payloadBufferSize);
+ if (getOracleQuery(queryId, payload, querySize))
+ {
+ response->resType = RespondOracleData::respondQueryData;
+ enqueueResponse(peer, sizeof(RespondOracleData) + querySize,
+ RespondOracleData::type(), header->dejavu(), response);
+ }
+
+ // get and send reply data
+ if (oqm.status == ORACLE_QUERY_STATUS_SUCCESS)
+ {
+ const uint16_t replySize = (uint16_t)OI::oracleInterfaces[oqm.interfaceIndex].replySize;
+ const void* replyData = getReplyDataFromTickTransactionStorage(oqm);
+ response->resType = RespondOracleData::respondReplyData;
+ enqueueResponse(peer, sizeof(RespondOracleData) + replySize,
+ RespondOracleData::type(), header->dejavu(), response);
+ }
+
+ break;
+ }
+
+ case RequestOracleData::requestSubscription:
+ // TODO
+ break;
+
+ }
+
+ enqueueResponse(peer, 0, EndResponse::type(), header->dejavu(), nullptr);
+}
diff --git a/src/oracle_core/oracle_engine.h b/src/oracle_core/oracle_engine.h
index e4fc97f57..d88ea61d2 100644
--- a/src/oracle_core/oracle_engine.h
+++ b/src/oracle_core/oracle_engine.h
@@ -1,49 +1,56 @@
#pragma once
-#include "network_messages/common_def.h"
+#include "contract_core/pre_qpi_def.h"
#include "contracts/qpi.h"
+#include "oracle_core/oracle_interfaces_def.h"
#include "system.h"
+#include "common_buffers.h"
+#include "spectrum/special_entities.h"
+#include "ticking/tick_storage.h"
#include "oracle_transactions.h"
#include "core_om_network_messages.h"
#include "platform/memory_util.h"
+
void enqueueResponse(Peer* peer, unsigned int dataSize, unsigned char type, unsigned int dejavu, const void* data);
constexpr uint32_t MAX_ORACLE_QUERIES = (1 << 18);
constexpr uint64_t ORACLE_QUERY_STORAGE_SIZE = MAX_ORACLE_QUERIES * 512;
constexpr uint32_t MAX_SIMULTANEOUS_ORACLE_QUERIES = 1024;
+#pragma pack(push, 4)
struct OracleQueryMetadata
{
int64_t queryId; ///< bits 31-62 encode index in tick, bits 0-30 are index in tick, negative values indicate error
uint8_t type; ///< contract query, user query, subscription (may be by multiple contracts)
uint8_t status; ///< overall status (pending -> success or timeout)
uint16_t statusFlags; ///< status and error flags (especially as returned by oracle machine connected to this node)
- uint32_t interfaceIndex;
uint32_t queryTick;
QPI::DateAndTime timeout;
+ uint32_t interfaceIndex;
union
{
struct
{
- m256i queryingEntity;
uint32_t queryTxIndex; // query tx index in tick
+ m256i queryingEntity;
} user;
struct
{
+ uint32_t notificationProcId;
uint64_t queryStorageOffset;
uint16_t queryingContract;
} contract;
struct
{
- uint64_t queryStorageOffset;
uint32_t subscriptionId; ///< 0 is reserved for "no subscription"
+ uint64_t queryStorageOffset;
} subscription;
} typeVar;
@@ -70,6 +77,9 @@ struct OracleQueryMetadata
} failure;
} statusVar;
};
+#pragma pack(pop)
+
+static_assert(sizeof(OracleQueryMetadata) == 72, "Unexpected struct size");
struct OracleSubscriptionContractStatus
@@ -78,6 +88,8 @@ struct OracleSubscriptionContractStatus
uint16_t contractIndex;
uint16_t notificationPeriodMinutes;
QPI::DateAndTime nextNotification;
+ USER_PROCEDURE notificationProcedure;
+ uint32_t notificationLocalsSize;
};
struct OracleSubscriptionMetadata
@@ -101,6 +113,7 @@ struct OracleSubscriptionMetadata
};
// State of received OM reply and computor commits for a single oracle query
+template
struct OracleReplyState
{
int64_t queryId;
@@ -109,23 +122,23 @@ struct OracleReplyState
uint16_t ownReplySize;
uint8_t ownReplyData[MAX_ORACLE_REPLY_SIZE + 2];
+ // track state of own reply commits (when they are scheduled and when actually got executed)
uint16_t ownReplyCommitExecCount;
- uint32 ownReplyCommitComputorTxTick[computorSeedsCount];
- uint32 ownReplyCommitComputorTxExecuted[computorSeedsCount];
+ uint32_t ownReplyCommitComputorTxTick[ownComputorSeedsCount];
+ uint32_t ownReplyCommitComputorTxExecuted[ownComputorSeedsCount];
m256i replyCommitDigests[NUMBER_OF_COMPUTORS];
m256i replyCommitKnowledgeProofs[NUMBER_OF_COMPUTORS];
uint32_t replyCommitTicks[NUMBER_OF_COMPUTORS];
+ uint16_t replyCommitHistogramIdx[NUMBER_OF_COMPUTORS];
+ uint16_t replyCommitHistogramCount[NUMBER_OF_COMPUTORS];
+ uint16_t mostCommitsHistIdx;
uint16_t totalCommits;
- uint16_t mostCommitsCount;
- m256i mostCommitsDigest;
+ uint32_t expectedRevealTxTick;
uint32_t revealTick;
uint32_t revealTxIndex;
-
- USER_PROCEDURE notificationProcedure;
- uint32_t notificationLocalsSize;
};
//
@@ -134,6 +147,14 @@ struct OracleRevenueCounter
uint32_t computorRevPoints[NUMBER_OF_COMPUTORS];
};
+struct OracleNotificationData
+{
+ uint32_t procedureId;
+ uint16_t contractIndex;
+ uint16_t inputSize;
+ uint8_t inputBuffer[16 + MAX_ORACLE_REPLY_SIZE];
+};
+
// Array with fast insert and remove, for which order of entries does not matter. Entry duplicates are possible.
template
struct UnsortedMultiset
@@ -150,7 +171,7 @@ struct UnsortedMultiset
return true;
}
- bool remove(unsigned int idx)
+ bool removeByIndex(unsigned int idx)
{
ASSERT(numValues <= N);
if (idx >= numValues || numValues == 0)
@@ -162,12 +183,27 @@ struct UnsortedMultiset
}
return true;
}
+
+ bool removeByValue(const T& v)
+ {
+ unsigned int idx = 0;
+ bool removedAny = false;
+ while (idx < numValues)
+ {
+ if (values[idx] == v)
+ removedAny = removedAny || removeByIndex(idx);
+ else
+ ++idx;
+ }
+ return removedAny;
+ }
};
-// TODO: locking, implement hash function for queryIdToIndex based on tick
+template
class OracleEngine
{
+protected:
/// array of all oracle queries of the epoch with capacity for MAX_ORACLE_QUERIES elements
OracleQueryMetadata* queries;
@@ -187,8 +223,11 @@ class OracleEngine
uint32_t queryIndexInTick;
} contractQueryIdState;
+ // data type of state of received OM reply and computor commits for single oracle query (used before reveal)
+ typedef OracleReplyState ReplyState;
+
// state of received OM reply and computor commits for each oracle query (used before reveal)
- OracleReplyState* replyStates;
+ ReplyState* replyStates;
// index in replyStates to check next for empty slot (cyclic buffer)
int32_t replyStatesIndex;
@@ -199,9 +238,35 @@ class OracleEngine
/// fast lookup of reply state indices for which commit tx is pending
UnsortedMultiset pendingCommitReplyStateIndices;
+ /// fast lookup of reply state indices for which reveal tx is pending
+ UnsortedMultiset pendingRevealReplyStateIndices;
+
+ // fast lookup of query indices for which the contract should be notified
+ UnsortedMultiset notificationQueryIndicies;
+
+ struct {
+ /// total number of successful oracle queries
+ unsigned long long successCount;
+
+ /// total number of timeout oracle queries
+ unsigned long long timeoutCount;
+
+ /// total number of timeout oracle queries
+ unsigned long long unresolvableCount;
+ } stats;
+
/// fast lookup of oracle query index (sequential position in queries array) from oracle query ID (composed of query tick and other info)
QPI::HashMap* queryIdToIndex;
+ /// array of ownComputorSeedsCount public keys (mainly for testing, in EFI core this points to computorPublicKeys from special_entities.h)
+ const m256i* ownComputorPublicKeys;
+
+ /// buffer used to store output of getNotification()
+ OracleNotificationData notificationOutputBuffer;
+
+ /// lock for preventing race conditions in concurrent execution
+ mutable volatile char lock;
+
/// Return empty reply state slot or max uint32 value on error
uint32_t getEmptyReplyStateSlot()
{
@@ -218,9 +283,19 @@ class OracleEngine
return 0xffffffff;
}
+ void freeReplyStateSlot(uint32_t replyStateIdx)
+ {
+ ASSERT(replyStatesIndex < MAX_SIMULTANEOUS_ORACLE_QUERIES);
+ setMem(&replyStates[replyStateIdx], sizeof(*replyStates), 0);
+ }
+
public:
- bool init()
+ /// Initialize object, passing array of own computor public keys (with number of elements given by template param ownComputorSeedsCount).
+ bool init(const m256i* ownComputorPublicKeys)
{
+ this->ownComputorPublicKeys = ownComputorPublicKeys;
+ lock = 0;
+
// alloc arrays and set to 0
if (!allocPoolWithErrorLog(L"OracleEngine::queries", MAX_ORACLE_QUERIES * sizeof(*queries), (void**)&queries, __LINE__)
|| !allocPoolWithErrorLog(L"OracleEngine::queryStorage", ORACLE_QUERY_STORAGE_SIZE, (void**)&queryStorage, __LINE__)
@@ -231,11 +306,14 @@ class OracleEngine
}
oracleQueryCount = 0;
- queryStorageBytesUsed = 1; // reserve offset 0 for "no data"
+ queryStorageBytesUsed = 8; // reserve offset 0 for "no data"
setMem(&contractQueryIdState, sizeof(contractQueryIdState), 0);
replyStatesIndex = 0;
pendingQueryIndices.numValues = 0;
pendingCommitReplyStateIndices.numValues = 0;
+ pendingRevealReplyStateIndices.numValues = 0;
+ notificationQueryIndicies.numValues = 0;
+ setMem(&stats, sizeof(stats), 0);
return true;
}
@@ -253,11 +331,13 @@ class OracleEngine
void save() const
{
+ LockGuard lockGuard(lock);
// save state (excluding queryIdToIndex and unused parts of large buffers)
}
void load()
{
+ LockGuard lockGuard(lock);
// load state (excluding queryIdToIndex and unused parts of large buffers)
// init queryIdToIndex
}
@@ -273,18 +353,25 @@ class OracleEngine
// ASSERT that tx is in tick storage at tx->tick, txIndex.
// check interface index
// check size of payload vs expected query of given interface
+
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
// add to query storage
// send query to oracle machine node
}
int64_t startContractQuery(uint16_t contractIndex, uint32_t interfaceIndex,
const void* queryData, uint16_t querySize, uint32_t timeoutMillisec,
- USER_PROCEDURE notificationProcedure, uint32_t notificationLocalsSize)
+ unsigned int notificationProcId)
{
// check inputs
if (contractIndex >= MAX_NUMBER_OF_CONTRACTS || interfaceIndex >= OI::oracleInterfacesCount || querySize != OI::oracleInterfaces[interfaceIndex].querySize)
return -1;
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
// check that still have free capacity for the query
if (oracleQueryCount >= MAX_ORACLE_QUERIES || pendingQueryIndices.numValues >= MAX_SIMULTANEOUS_ORACLE_QUERIES || queryStorageBytesUsed + querySize > ORACLE_QUERY_STORAGE_SIZE)
return -1;
@@ -295,7 +382,7 @@ class OracleEngine
return -1;
// compute timeout as absolute point in time
- DateAndTime timeout = DateAndTime::now();
+ auto timeout = QPI::DateAndTime::now();
if (!timeout.addMillisec(timeoutMillisec))
return -1;
@@ -319,7 +406,7 @@ class OracleEngine
// map ID to index
ASSERT(!queryIdToIndex->contains(queryId));
- if (queryIdToIndex->set(queryId, oracleQueryCount) == NULL_INDEX)
+ if (queryIdToIndex->set(queryId, oracleQueryCount) == QPI::NULL_INDEX)
return -1;
// register index of pending query
@@ -336,14 +423,13 @@ class OracleEngine
queryMetadata.timeout = timeout;
queryMetadata.typeVar.contract.queryingContract = contractIndex;
queryMetadata.typeVar.contract.queryStorageOffset = queryStorageBytesUsed;
+ queryMetadata.typeVar.contract.notificationProcId = notificationProcId;
queryMetadata.statusVar.pending.replyStateIndex = replyStateSlotIdx;
// init reply state (temporary until reply is revealed)
- auto& replyState = replyStates[replyStateSlotIdx];
- setMemory(replyState, 0);
+ ReplyState& replyState = replyStates[replyStateSlotIdx];
+ setMem(&replyState, sizeof(replyState), 0);
replyState.queryId = queryId;
- replyState.notificationProcedure = notificationProcedure;
- replyState.notificationLocalsSize = notificationLocalsSize;
// copy oracle query data to permanent storage
copyMem(queryStorage + queryStorageBytesUsed, queryData, querySize);
@@ -352,11 +438,14 @@ class OracleEngine
// enqueue query message to oracle machine node
enqueueOracleQuery(queryId, interfaceIndex, timeoutMillisec, queryData, querySize);
+ // TODO: send log event ORACLE_QUERY with queryId, query starter, interface, type, status
+
return queryId;
}
+protected:
// Enqueue oracle machine query message. May be called from tick processor or contract processor only (uses reorgBuffer).
- void enqueueOracleQuery(int64_t queryId, uint32_t interfaceIdx, uint16_t timeoutMillisec, const void* queryData, uint16 querySize)
+ static void enqueueOracleQuery(int64_t queryId, uint32_t interfaceIdx, uint32_t timeoutMillisec, const void* queryData, uint16_t querySize)
{
// Prepare message payload
OracleMachineQuery* omq = reinterpret_cast(reorgBuffer);
@@ -369,6 +458,7 @@ class OracleEngine
enqueueResponse((Peer*)0x1, sizeof(*omq) + querySize, OracleMachineQuery::type(), 0, omq);
}
+public:
// CAUTION: Called from request processor, requires locking!
void processOracleMachineReply(const OracleMachineReply* replyMessage, uint32_t replyMessageSize)
{
@@ -377,6 +467,9 @@ class OracleEngine
if (replyMessageSize < sizeof(OracleMachineReply))
return;
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
// get query index
uint32_t queryIndex;
if (!queryIdToIndex->get(replyMessage->oracleQueryId, queryIndex) || queryIndex >= oracleQueryCount)
@@ -407,7 +500,8 @@ class OracleEngine
// get reply state
const auto replyStateIdx = oqm.statusVar.pending.replyStateIndex;
ASSERT(replyStateIdx < MAX_SIMULTANEOUS_ORACLE_QUERIES);
- OracleReplyState& replyState = replyStates[replyStateIdx];
+ ReplyState& replyState = replyStates[replyStateIdx];
+ ASSERT(replyState.queryId == replyMessage->oracleQueryId);
// return if we already got a reply
if (replyState.ownReplySize)
@@ -428,124 +522,594 @@ class OracleEngine
pendingCommitReplyStateIndices.add(replyStateIdx);
}
- /// Return array of reply indices and size of array (as output-by-reference parameter). To be used for getReplyCommitTransactionItem().
- const uint32_t* getPendingReplyCommitTransactionIndices(uint32_t& arraySizeOutput) const
- {
- arraySizeOutput = pendingCommitReplyStateIndices.numValues;
- return pendingCommitReplyStateIndices.values;
- }
-
/**
- * Return commit items in OracleReplyCommitTransaction.
+ * Prepare OracleReplyCommitTransaction in txBuffer, setting all except signature.
+ *
+ * @param txBuffer Buffer for constructing the transaction. Size must be at least MAX_TRANSACTION_SIZE bytes.
* @param computorIdx Index of computor list of computors broadcasted by arbitrator.
* @param ownComputorIdx Index of computor in local array computorSeeds.
- * @param replyIdx Index of reply to consider. Use getPendingReplyCommitTransactionIndices() to get an array of those.
* @param txScheduleTick Tick, in which the transaction is supposed to be scheduled.
- * @param commit Pointer to output buffer of commit data in transaction that is being constructed.
- * @return Whether this computor/reply is supposed to be added to tx. If false, commit is untouched.
+ * @param startIdx Index returned by the previous call of this function if more than one tx is required.
+ * @return 0 if no tx needs to be sent; UINT32_MAX if all pending commits are included in the created tx;
+ * any value in between indicates that another tx needs to be created and should be passed as the start
+ * index for the next call of this function
*
* Called from tick processor.
*/
- bool getReplyCommitTransactionItem(
- uint16_t computorIdx, uint16_t ownComputorIdx,
- int32_t replyIdx, uint32_t txScheduleTick,
- OracleReplyCommitTransactionItem* commit)
+ uint32_t getReplyCommitTransaction(
+ void* txBuffer, uint16_t computorIdx, uint16_t ownComputorIdx,
+ uint32_t txScheduleTick, uint32_t startIdx = 0)
{
// check inputs
- ASSERT(commit);
- if (ownComputorIdx >= computorSeedsCount || computorIdx >= NUMBER_OF_COMPUTORS || replyIdx >= MAX_SIMULTANEOUS_ORACLE_QUERIES)
- return false;
+ ASSERT(txBuffer);
+ if (ownComputorIdx >= ownComputorSeedsCount || computorIdx >= NUMBER_OF_COMPUTORS || txScheduleTick <= system.tick)
+ return 0;
+
+ // init data pointers and reply commit counter
+ auto* tx = reinterpret_cast(txBuffer);
+ auto* commits = reinterpret_cast(tx->inputPtr());
+ uint16_t commitsCount = 0;
+ constexpr uint16_t maxCommitsCount = MAX_INPUT_SIZE / sizeof(OracleReplyCommitTransactionItem);
+
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
+ // consider queries with pending commit tx, specifically the reply data indices of those
+ const unsigned int replyIdxCount = pendingCommitReplyStateIndices.numValues;
+ const unsigned int* replyIndices = pendingCommitReplyStateIndices.values;
+ unsigned int idx = startIdx;
+ for (; idx < replyIdxCount; ++idx)
+ {
+ // get reply state and check that oracle reply has been received
+ const unsigned int replyIdx = replyIndices[idx];
+ if (replyIdx >= MAX_SIMULTANEOUS_ORACLE_QUERIES)
+ continue;
+ ReplyState& replyState = replyStates[replyIdx];
+ if (replyState.queryId <= 0 || replyState.ownReplySize == 0)
+ continue;
- // get reply state and check that oracle reply has been received
- OracleReplyState& replyState = replyStates[replyIdx];
- if (replyState.queryId == 0 || replyState.ownReplySize == 0)
- return false;
+ // tx already executed or scheduled?
+ if (replyState.ownReplyCommitComputorTxExecuted[ownComputorIdx] ||
+ replyState.ownReplyCommitComputorTxTick[ownComputorIdx] >= system.tick) // TODO: > or >= ?
+ continue;
- // tx already executed or scheduled?
- if (replyState.ownReplyCommitComputorTxExecuted[ownComputorIdx] ||
- replyState.ownReplyCommitComputorTxTick[ownComputorIdx] >= system.tick) // TODO: > or >= ?
- return false;
+ // additional commit required -> leave loop early to finish tx
+ if (commitsCount == maxCommitsCount)
+ break;
- // set known data of commit tx part
- commit->queryId = replyState.queryId;
- commit->replyDigest = replyState.ownReplyDigest;
+ // set known data of commit tx part
+ commits[commitsCount].queryId = replyState.queryId;
+ commits[commitsCount].replyDigest = replyState.ownReplyDigest;
- // compute knowledge proof of commit = K12(oracle reply + computor index)
- ASSERT(replyState.ownReplySize <= MAX_ORACLE_REPLY_SIZE);
- *(uint16_t*)(replyState.ownReplyData + replyState.ownReplySize) = computorIdx;
- KangarooTwelve(replyState.ownReplyData, replyState.ownReplySize + 2, &commit->replyKnowledgeProof, 32);
+ // compute knowledge proof of commit = K12(oracle reply + computor index)
+ ASSERT(replyState.ownReplySize <= MAX_ORACLE_REPLY_SIZE);
+ *(uint16_t*)(replyState.ownReplyData + replyState.ownReplySize) = computorIdx;
+ KangarooTwelve(replyState.ownReplyData, replyState.ownReplySize + 2, &commits[commitsCount].replyKnowledgeProof, 32);
- // signal to schedule tx for given tick
- replyState.ownReplyCommitComputorTxTick[ownComputorIdx] = txScheduleTick;
- return true;
+ // signal to schedule tx for given tick
+ replyState.ownReplyCommitComputorTxTick[ownComputorIdx] = txScheduleTick;
+
+ // we have completed adding this commit
+ ++commitsCount;
+ }
+
+ // no reply commits needed? -> signal to skip tx
+ if (!commitsCount)
+ return 0;
+
+ // finish all of tx except for signature
+ tx->sourcePublicKey = ownComputorPublicKeys[ownComputorIdx];
+ tx->destinationPublicKey = m256i::zero();
+ tx->amount = 0;
+ tx->tick = txScheduleTick;
+ tx->inputType = OracleReplyCommitTransactionPrefix::transactionType();
+ tx->inputSize = commitsCount * sizeof(OracleReplyCommitTransactionItem);
+
+ // if we had to break from the loop early, return and signal to call this again for creating another
+ // tx with the start index we return here
+ if (idx < replyIdxCount)
+ return idx;
+
+ // signal that the tx is ready and the function doesn't need to be called again for more commits
+ return UINT32_MAX;
}
// Called from tick processor.
- void processTransactionOracleReplyCommit(const OracleReplyCommitTransactionPrefix* transaction)
+ bool processOracleReplyCommitTransaction(const OracleReplyCommitTransactionPrefix* transaction)
{
+ // check precondition for calling with ASSERTs
ASSERT(transaction != nullptr);
ASSERT(transaction->checkValidity());
ASSERT(isZero(transaction->destinationPublicKey));
- ASSERT(transaction->tick == system.tick);
+ ASSERT(transaction->inputType == OracleReplyCommitTransactionPrefix::transactionType());
+ // check size of tx
if (transaction->inputSize < OracleReplyCommitTransactionPrefix::minInputSize())
- return;
+ return false;
- int compIdx = computorIndex(transaction->sourcePublicKey);
+ // get computor index
+ const int compIdx = computorIndex(transaction->sourcePublicKey);
if (compIdx < 0)
- return;
+ return false;
+
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+ // process the N commits in this tx
const OracleReplyCommitTransactionItem* item = (const OracleReplyCommitTransactionItem*)transaction->inputPtr();
uint32_t size = sizeof(OracleReplyCommitTransactionItem);
while (size <= transaction->inputSize)
{
+ // get and check query index
uint32_t queryIndex;
if (!queryIdToIndex->get(item->queryId, queryIndex) || queryIndex >= oracleQueryCount)
continue;
+ // get query metadata and check state
OracleQueryMetadata& oqm = queries[queryIndex];
if (oqm.status != ORACLE_QUERY_STATUS_PENDING && oqm.status != ORACLE_QUERY_STATUS_COMMITTED)
continue;
- // TODO
+ // get reply state
+ const auto replyStateIdx = oqm.statusVar.pending.replyStateIndex;
+ ASSERT(replyStateIdx < MAX_SIMULTANEOUS_ORACLE_QUERIES);
+ ReplyState& replyState = replyStates[replyStateIdx];
+ ASSERT(replyState.queryId == item->queryId);
+
+ // ignore commit if we already have processed a commit by this computor
+ if (replyState.replyCommitTicks[compIdx] != 0)
+ continue;
+ // save reply commit of computor
+ replyState.replyCommitDigests[compIdx] = item->replyDigest;
+ replyState.replyCommitKnowledgeProofs[compIdx] = item->replyKnowledgeProof;
+ replyState.replyCommitTicks[compIdx] = transaction->tick;
+
+ // if tx is from own computor, prevent rescheduling of commit tx
+ for (auto i = 0ull; replyState.ownReplyCommitExecCount < ownComputorSeedsCount && i < ownComputorSeedsCount; ++i)
+ {
+ if (!replyState.ownReplyCommitComputorTxExecuted[i] && ownComputorPublicKeys[i] == transaction->sourcePublicKey)
+ {
+ replyState.ownReplyCommitComputorTxExecuted[i] = transaction->tick;
+ ++replyState.ownReplyCommitExecCount;
+ break;
+ }
+ }
+
+ // update reply commit histogram
+ // 1. search existing or free slot of digest in histogram array
+ uint16_t histIdx = 0;
+ while (replyState.replyCommitHistogramCount[histIdx] != 0 &&
+ item->replyDigest != replyState.replyCommitDigests[replyState.replyCommitHistogramIdx[histIdx]])
+ {
+ ASSERT(histIdx < NUMBER_OF_COMPUTORS);
+ ++histIdx;
+ }
+ // 2. update slot
+ if (replyState.replyCommitHistogramCount[histIdx] == 0)
+ {
+ // first time we see this commit digest
+ replyState.replyCommitHistogramIdx[histIdx] = compIdx;
+ }
+ ++replyState.replyCommitHistogramCount[histIdx];
+ // 3. update variables that trigger reveal
+ ++replyState.totalCommits;
+ if (replyState.replyCommitHistogramCount[histIdx] > replyState.replyCommitHistogramCount[replyState.mostCommitsHistIdx])
+ replyState.mostCommitsHistIdx = histIdx;
+
+ // check if there are enough computor commits for decision
+ const auto mostCommitsCount = replyState.replyCommitHistogramCount[replyState.mostCommitsHistIdx];
+ if (mostCommitsCount >= QUORUM)
+ {
+ // enough commits for the reply reveal transaction
+ // -> switch to status COMMITTED
+ if (oqm.status != ORACLE_QUERY_STATUS_COMMITTED)
+ {
+ oqm.status = ORACLE_QUERY_STATUS_COMMITTED;
+ pendingCommitReplyStateIndices.removeByValue(replyStateIdx);
+ pendingRevealReplyStateIndices.add(replyStateIdx);
+ // TODO: send log event ORACLE_QUERY with queryId, query starter, interface, type, status
+ }
+ }
+ else if (replyState.totalCommits - mostCommitsCount > NUMBER_OF_COMPUTORS - QUORUM)
+ {
+ // more than 1/3 of commits don't vote for most voted digest -> getting quorum isn't possible
+ // -> switch to status UNRESOLVABLE
+ oqm.status = ORACLE_QUERY_STATUS_UNRESOLVABLE;
+ oqm.statusFlags |= ORACLE_FLAG_COMP_DISAGREE;
+ oqm.statusVar.failure.agreeingCommits = mostCommitsCount;
+ oqm.statusVar.failure.totalCommits = replyState.totalCommits;
+ pendingQueryIndices.removeByValue(queryIndex);
+ ++stats.unresolvableCount;
+
+ // cleanup data of pending reply immediately (no info for revenue required)
+ pendingCommitReplyStateIndices.removeByValue(replyStateIdx);
+ freeReplyStateSlot(replyStateIdx);
+
+ // schedule contract notification(s) if needed
+ if (oqm.type != ORACLE_QUERY_TYPE_USER_QUERY)
+ notificationQueryIndicies.add(queryIndex);
+
+ // TODO: send log event ORACLE_QUERY with queryId, query starter, interface, type, status
+ }
+
+ // go to next commit in tx
size += sizeof(OracleReplyCommitTransactionItem);
++item;
}
+
+ return true;
+ }
+
+ /**
+ * Prepare OracleReplyRevealTransaction in txBuffer, setting all except signature.
+ *
+ * @param txBuffer Buffer for constructing the transaction. Size must be at least MAX_TRANSACTION_SIZE bytes.
+ * @param ownComputorIdx Index of computor in local array computorSeeds.
+ * @param txScheduleTick Tick, in which the transaction is supposed to be scheduled.
+ * @param startIdx Index returned by the previous call of this function if more than one tx is required.
+ * @return 0 if no tx needs to be sent; any other value indicates that another tx needs to be created and it
+ * should be passed as the start index for the next call of this function
+ *
+ * Called from tick processor.
+ */
+ uint32_t getReplyRevealTransaction(void* txBuffer, uint16_t ownComputorIdx, uint32_t txScheduleTick, uint32_t startIdx = 0)
+ {
+ // check inputs
+ ASSERT(txBuffer);
+ if (ownComputorIdx >= ownComputorSeedsCount || txScheduleTick <= system.tick)
+ return 0;
+
+ // init data pointer
+ auto* tx = reinterpret_cast(txBuffer);
+ void* txReplyData = tx + 1;
+
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
+ // consider queries with pending reveal tx, specifically the reply data indices of those
+ const unsigned int replyIdxCount = pendingRevealReplyStateIndices.numValues;
+ const unsigned int* replyIndices = pendingRevealReplyStateIndices.values;
+ unsigned int idx = startIdx;
+ for (; idx < replyIdxCount; ++idx)
+ {
+ // get reply state and check that oracle reply has been received and a quorum has formed about the value
+ const unsigned int replyIdx = replyIndices[idx];
+ if (replyIdx >= MAX_SIMULTANEOUS_ORACLE_QUERIES)
+ continue;
+ ReplyState& replyState = replyStates[replyIdx];
+ if (replyState.queryId <= 0 || replyState.ownReplySize == 0)
+ continue;
+ const uint16_t mostCommitsCount = replyState.replyCommitHistogramCount[replyState.mostCommitsHistIdx];
+ ASSERT(replyState.mostCommitsHistIdx < NUMBER_OF_COMPUTORS && mostCommitsCount <= NUMBER_OF_COMPUTORS);
+ if (mostCommitsCount < QUORUM)
+ continue;
+
+ // tx already scheduled or seen?
+ if (replyState.expectedRevealTxTick >= system.tick) // TODO: > or >= ?
+ continue;
+
+ // check if local view is the quorum view
+ const m256i quorumCommitDigest = replyState.replyCommitDigests[replyState.replyCommitHistogramIdx[replyState.mostCommitsHistIdx]];
+ if (quorumCommitDigest != replyState.ownReplyDigest)
+ continue;
+
+ // set all of tx except for signature
+ tx->sourcePublicKey = ownComputorPublicKeys[ownComputorIdx];
+ tx->destinationPublicKey = m256i::zero();
+ tx->amount = 0;
+ tx->tick = txScheduleTick;
+ tx->inputType = OracleReplyRevealTransactionPrefix::transactionType();
+ tx->inputSize = sizeof(tx->queryId) + replyState.ownReplySize;
+ tx->queryId = replyState.queryId;
+ copyMem(txReplyData, replyState.ownReplyData, replyState.ownReplySize);
+
+ // remember that we have scheduled reveal of this reply
+ replyState.expectedRevealTxTick = txScheduleTick;
+
+ // return non-zero in order instruct caller to call this function again with the returned startIdx
+ return idx + 1;
+ }
+
+ // currently no reply reveal needed -> signal to skip tx
+ return 0;
+ }
+
+protected:
+ // Check oracle reply reveal transaction. Returns reply state if okay or NULL otherwise. Also sets output param queryIndexOutput.
+ // Caller is responsible for locking.
+ ReplyState* checkReplyRevealTransaction(const OracleReplyRevealTransactionPrefix* transaction, uint32_t* queryIndexOutput = nullptr) const
+ {
+ // check precondition for calling with ASSERTs
+ ASSERT(transaction != nullptr);
+ ASSERT(transaction->checkValidity());
+ ASSERT(transaction->inputType == OracleReplyRevealTransactionPrefix::transactionType());
+ ASSERT(isZero(transaction->destinationPublicKey));
+
+ // check size of tx
+ if (transaction->inputSize < OracleReplyRevealTransactionPrefix::minInputSize())
+ return nullptr;
+
+ // check that tx source is computor
+ if (computorIndex(transaction->sourcePublicKey) < 0)
+ return nullptr;
+
+ // get and check query index
+ uint32_t queryIndex;
+ if (!queryIdToIndex->get(transaction->queryId, queryIndex) || queryIndex >= oracleQueryCount)
+ return nullptr;
+
+ // get query metadata and check state
+ OracleQueryMetadata& oqm = queries[queryIndex];
+ if (oqm.status != ORACLE_QUERY_STATUS_COMMITTED)
+ return nullptr;
+
+ // check reply size vs size expected by interface
+ ASSERT(oqm.interfaceIndex < OI::oracleInterfacesCount);
+ const uint16_t replySize = transaction->inputSize - sizeof(transaction->queryId);
+ if (replySize != OI::oracleInterfaces[oqm.interfaceIndex].replySize)
+ {
+ oqm.statusFlags |= ORACLE_FLAG_BAD_SIZE_REVEAL;
+ return nullptr;
+ }
+
+ // get reply state
+ const auto replyStateIdx = oqm.statusVar.pending.replyStateIndex;
+ ASSERT(replyStateIdx < MAX_SIMULTANEOUS_ORACLE_QUERIES);
+ ReplyState& replyState = replyStates[replyStateIdx];
+ ASSERT(replyState.queryId == transaction->queryId);
+
+ // compute digest of reply in reveal tx
+ const void* replyData = transaction + 1;
+ m256i revealDigest;
+ KangarooTwelve(replyData, replySize, revealDigest.m256i_u8, 32);
+
+ // check that revealed reply matches the quorum digest
+ const m256i quorumCommitDigest = replyState.replyCommitDigests[replyState.replyCommitHistogramIdx[replyState.mostCommitsHistIdx]];
+ ASSERT(!isZero(quorumCommitDigest));
+ if (revealDigest != quorumCommitDigest)
+ return nullptr;
+
+ // set output param
+ if (queryIndexOutput)
+ *queryIndexOutput = queryIndex;
+
+ return &replyState;
+ }
+
+ const void* getReplyDataFromTickTransactionStorage(const OracleQueryMetadata& queryMetadata) const
+ {
+ const uint32_t tick = queryMetadata.statusVar.success.revealTick;
+ const uint32_t txSlotInTickData = queryMetadata.statusVar.success.revealTxIndex;
+ ASSERT(txSlotInTickData < NUMBER_OF_TRANSACTIONS_PER_TICK);
+ const unsigned long long* tsTickTransactionOffsets = ts.tickTransactionOffsets.getByTickInCurrentEpoch(tick);
+ const auto* transaction = (OracleReplyRevealTransactionPrefix*)ts.tickTransactions.ptr(tsTickTransactionOffsets[txSlotInTickData]);
+ ASSERT(queryMetadata.queryId == transaction->queryId);
+ ASSERT(queryMetadata.interfaceIndex < OI::oracleInterfacesCount);
+ ASSERT(transaction->inputSize - sizeof(transaction->queryId) == OI::oracleInterfaces[queryMetadata.interfaceIndex].replySize);
+ return transaction + 1;
+ }
+
+public:
+ // Called by request processor when a tx is received in order to minimize sending of reveal tx.
+ void announceExpectedRevealTransaction(const OracleReplyRevealTransactionPrefix* transaction)
+ {
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
+ // check tx and get reply state
+ ReplyState* replyState = checkReplyRevealTransaction(transaction);
+ if (!replyState)
+ return;
+
+ // update tick when reveal is expected
+ if (!replyState->expectedRevealTxTick || replyState->expectedRevealTxTick > transaction->tick)
+ replyState->expectedRevealTxTick = transaction->tick;
+ }
+
+ // Called from tick processor.
+ bool processOracleReplyRevealTransaction(const OracleReplyRevealTransactionPrefix* transaction, uint32_t txSlotInTickData)
+ {
+ ASSERT(txSlotInTickData < NUMBER_OF_TRANSACTIONS_PER_TICK);
+ ASSERT(transaction->tick == system.tick);
+
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
+ // check tx and get reply state + query metadata
+ uint32_t queryIndex;
+ ReplyState* replyState = checkReplyRevealTransaction(transaction, &queryIndex);
+ if (!replyState)
+ return false;
+ OracleQueryMetadata& oqm = queries[queryIndex];
+ const auto replyStateIdx = oqm.statusVar.pending.replyStateIndex;
+
+ // TODO: check knowledge proofs of all computors and add revenue points for computors who sent correct commit tx fastest
+
+ // update state to SUCCESS
+ oqm.statusVar.success.revealTick = transaction->tick;
+ oqm.statusVar.success.revealTxIndex = txSlotInTickData;
+ oqm.status = ORACLE_QUERY_STATUS_SUCCESS;
+ pendingQueryIndices.removeByValue(queryIndex);
+ ++stats.successCount;
+
+ // cleanup reply state
+ pendingRevealReplyStateIndices.removeByValue(replyStateIdx);
+ freeReplyStateSlot(replyStateIdx);
+
+ // schedule contract notification(s) if needed
+ if (oqm.type != ORACLE_QUERY_TYPE_USER_QUERY)
+ notificationQueryIndicies.add(queryIndex);
+
+ return true;
+ }
+
+ // Called once per tick from the tick processor.
+ void processTimeouts()
+ {
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
+ // consider peinding queries
+ const uint32_t queryIdxCount = pendingQueryIndices.numValues;
+ const uint32_t* queryIndices = pendingQueryIndices.values;
+ const QPI::DateAndTime now = QPI::DateAndTime::now();
+ for (uint32_t i = 0; i < queryIdxCount; ++i)
+ {
+ // get query data
+ const uint32_t queryIndex = queryIndices[i];
+ ASSERT(queryIndex < oracleQueryCount);
+ OracleQueryMetadata& oqm = queries[queryIndex];
+ ASSERT(oqm.status == ORACLE_QUERY_STATUS_PENDING || oqm.status == ORACLE_QUERY_STATUS_COMMITTED);
+
+ // check for timeout
+ if (oqm.timeout < now)
+ {
+ // get reply state
+ const auto replyStateIdx = oqm.statusVar.pending.replyStateIndex;
+ ASSERT(replyStateIdx < MAX_SIMULTANEOUS_ORACLE_QUERIES);
+ ReplyState& replyState = replyStates[replyStateIdx];
+ ASSERT(replyState.queryId == oqm.queryId);
+ const uint16_t mostCommitsCount = replyState.replyCommitHistogramCount[replyState.mostCommitsHistIdx];
+ ASSERT(replyState.mostCommitsHistIdx < NUMBER_OF_COMPUTORS && mostCommitsCount <= NUMBER_OF_COMPUTORS);
+
+ // update state to TIMEOUT
+ oqm.status = ORACLE_QUERY_STATUS_TIMEOUT;
+ oqm.statusFlags |= ORACLE_FLAG_TIMEOUT;
+ oqm.statusVar.failure.agreeingCommits = mostCommitsCount;
+ oqm.statusVar.failure.totalCommits = replyState.totalCommits;
+ pendingQueryIndices.removeByValue(queryIndex);
+ ++stats.timeoutCount;
+
+ // cleanup reply state
+ pendingCommitReplyStateIndices.removeByValue(replyStateIdx);
+ pendingRevealReplyStateIndices.removeByValue(replyStateIdx);
+ freeReplyStateSlot(replyStateIdx);
+
+ // schedule contract notification(s) if needed
+ if (oqm.type != ORACLE_QUERY_TYPE_USER_QUERY)
+ notificationQueryIndicies.add(queryIndex);
+
+ // TODO: send log event ORACLE_QUERY with queryId, query starter, interface, type, status
+ }
+ }
+ }
+
+ /**
+ * @brief Get info for notfying contracts. Call until nullptr is returned.
+ * @return Pointer to notification info or nullptr if no notifications are needed.
+ *
+ * Only to be used in tick processor! No concurrent use supported. Uses one internal buffer for returned data.
+ */
+ const OracleNotificationData* getNotification()
+ {
+ // currently no notifications needed?
+ if (!notificationQueryIndicies.numValues)
+ return nullptr;
+
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
+ // get index and update list
+ const uint32_t queryIndex = notificationQueryIndicies.values[0];
+ notificationQueryIndicies.removeByIndex(0);
+
+ // get query metadata
+ const OracleQueryMetadata& oqm = queries[queryIndex];
+ ASSERT(oqm.type == ORACLE_QUERY_TYPE_CONTRACT_QUERY);
+
+ const auto replySize = OI::oracleInterfaces[oqm.interfaceIndex].replySize;
+ ASSERT(16 + replySize < 0xffff);
+
+ if (oqm.type == ORACLE_QUERY_TYPE_CONTRACT_QUERY)
+ {
+ // setup notification
+ notificationOutputBuffer.contractIndex = oqm.typeVar.contract.queryingContract;
+ notificationOutputBuffer.procedureId = oqm.typeVar.contract.notificationProcId;
+ notificationOutputBuffer.inputSize = (uint16_t)(16 + replySize);
+ setMem(notificationOutputBuffer.inputBuffer, notificationOutputBuffer.inputSize, 0);
+ *(int64_t*)(notificationOutputBuffer.inputBuffer + 0) = oqm.queryId;
+ *(uint32_t*)(notificationOutputBuffer.inputBuffer + 8) = 0;
+ *(uint32_t*)(notificationOutputBuffer.inputBuffer + 12) = oqm.status;
+ if (oqm.status == ORACLE_QUERY_STATUS_SUCCESS)
+ {
+ const void* replySrcPtr = getReplyDataFromTickTransactionStorage(oqm);
+ copyMem(notificationOutputBuffer.inputBuffer + 16, replySrcPtr, replySize);
+ }
+ }
+
+ // TODO: handle subscriptions
+
+ return ¬ificationOutputBuffer;
}
void beginEpoch()
{
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
// TODO
// clean all subscriptions
// clean all queries (except for last n ticks in case of seamless transition)
}
- bool getOracleQuery(uint64_t queryId, const void* queryData, uint16_t querySize) const
+ bool getOracleQuery(int64_t queryId, void* queryData, uint16_t querySize) const
{
+ // lock for accessing engine data
+ LockGuard lockGuard(lock);
+
// get query index
uint32_t queryIndex;
if (!queryIdToIndex->get(queryId, queryIndex) || queryIndex >= oracleQueryCount)
return false;
+ // check query size
const auto& queryMetadata = queries[queryIndex];
- // TODO
+ ASSERT(queryMetadata.interfaceIndex < OI::oracleInterfacesCount);
+ if (querySize != OI::oracleInterfaces[queryMetadata.interfaceIndex].querySize)
+ return false;
+ void* querySrcPtr = nullptr;
+ switch (queryMetadata.type)
+ {
+ case ORACLE_QUERY_TYPE_CONTRACT_QUERY:
+ {
+ const auto offset = queryMetadata.typeVar.contract.queryStorageOffset;
+ ASSERT(offset > 0 && offset < queryStorageBytesUsed && queryStorageBytesUsed <= ORACLE_QUERY_STORAGE_SIZE);
+ querySrcPtr = queryStorage + offset;
+ break;
+ }
+ // TODO: support other types
+ default:
+ return false;
+ }
+
+ // Return query data
+ copyMem(queryData, querySrcPtr, querySize);
return true;
}
void logStatus(CHAR16* message) const
{
- setText(message, L"Oracles queries: ");
+ setText(message, L"Oracles queries: pending ");
appendNumber(message, pendingCommitReplyStateIndices.numValues, FALSE);
appendText(message, " / ");
+ appendNumber(message, pendingRevealReplyStateIndices.numValues, FALSE);
+ appendText(message, " / ");
appendNumber(message, pendingQueryIndices.numValues, FALSE);
- appendText(message, " got replies from OM node");
+ appendText(message, ", successful ");
+ appendNumber(message, stats.successCount, FALSE);
+ appendText(message, ", timeout ");
+ appendNumber(message, stats.timeoutCount, FALSE);
+ appendText(message, ", unresolvable ");
+ appendNumber(message, stats.unresolvableCount, FALSE);
logToConsole(message);
}
+
+ void processRequestOracleData(Peer* peer, RequestResponseHeader* header) const;
};
-GLOBAL_VAR_DECL OracleEngine oracleEngine;
+GLOBAL_VAR_DECL OracleEngine oracleEngine;
/*
- Handle seamless transitions? Keep state?
diff --git a/src/oracle_core/oracle_interfaces_def.h b/src/oracle_core/oracle_interfaces_def.h
index 23c2ff8e1..cb91fb596 100644
--- a/src/oracle_core/oracle_interfaces_def.h
+++ b/src/oracle_core/oracle_interfaces_def.h
@@ -8,7 +8,10 @@ namespace OI
#define ORACLE_INTERFACE_INDEX 0
#include "oracle_interfaces/Price.h"
#undef ORACLE_INTERFACE_INDEX
+
#define ORACLE_INTERFACE_INDEX 1
+#include "oracle_interfaces/Mock.h"
+#undef ORACLE_INTERFACE_INDEX
#define REGISTER_ORACLE_INTERFACE(Interface) {sizeof(Interface::OracleQuery), sizeof(Interface::OracleReply)}
@@ -17,6 +20,7 @@ namespace OI
unsigned long long replySize;
} oracleInterfaces[] = {
REGISTER_ORACLE_INTERFACE(Price),
+ REGISTER_ORACLE_INTERFACE(Mock),
};
static constexpr uint32_t oracleInterfacesCount = sizeof(oracleInterfaces) / sizeof(oracleInterfaces[0]);
diff --git a/src/oracle_core/oracle_transactions.h b/src/oracle_core/oracle_transactions.h
index 42e9c6f48..5baf3f3ab 100644
--- a/src/oracle_core/oracle_transactions.h
+++ b/src/oracle_core/oracle_transactions.h
@@ -23,6 +23,8 @@ struct OracleReplyCommitTransactionPrefix : public Transaction
{
return sizeof(OracleReplyCommitTransactionItem);
}
+
+ // followed by: n times OracleReplyCommitTransactionItem
};
// Transaction for revealing oracle reply. The tx prefix is followed by the OracleReply data
@@ -40,6 +42,8 @@ struct OracleReplyRevealTransactionPrefix : public Transaction
}
unsigned long long queryId;
+
+ // followed by: oracle reply
};
// Transaction for querying oracle. The tx prefix is followed by the OracleQuery data
diff --git a/src/oracle_interfaces/Mock.h b/src/oracle_interfaces/Mock.h
new file mode 100644
index 000000000..4ad135cad
--- /dev/null
+++ b/src/oracle_interfaces/Mock.h
@@ -0,0 +1,44 @@
+using namespace QPI;
+
+/**
+* Oracle interface "Mock" (see Price.h for general documentation about oracle interfaces).
+*
+* This is for useful testing the oracle machine and the core logic without involving external services.
+*/
+struct Mock
+{
+ //-------------------------------------------------------------------------
+ // Mandatory oracle interface definitions
+
+ /// Oracle interface index
+ static constexpr uint32 oracleInterfaceIndex = ORACLE_INTERFACE_INDEX;
+
+ /// Oracle query data / input to the oracle machine
+ struct OracleQuery
+ {
+ /// Value that processed
+ uint64 value;
+ };
+
+ /// Oracle reply data / output of the oracle machine
+ struct OracleReply
+ {
+ /// Value given in query
+ uint64 echoedValue;
+
+ // 2 * value given in query
+ uint64 doubledValue;
+ };
+
+ /// Return query fee, which may depend on the specific query (for example on the oracle).
+ static sint64 getQueryFee(const OracleQuery& query)
+ {
+ return 10;
+ }
+
+ /// Return subscription fee, which may depend on query and interval.
+ static sint64 getSubscriptionFee(const OracleQuery& query, uint16 notifyIntervalInMinutes)
+ {
+ return 1000;
+ }
+};
diff --git a/src/oracle_interfaces/Price.h b/src/oracle_interfaces/Price.h
index fba931343..70bc92a48 100644
--- a/src/oracle_interfaces/Price.h
+++ b/src/oracle_interfaces/Price.h
@@ -96,4 +96,18 @@ struct Price
// TODO:
// implement and test currency conversion (including using uint128 on the way in order to support large quantities)
// provide character enum / id constructor for convenient setting of oracle/currency IDs
+
+ /// Get oracle ID of mock oracle
+ static id getMockOracleId()
+ {
+ using namespace Ch;
+ return id(m, o, c, k, null);
+ }
+
+ /// Get oracle ID of coingecko oracle
+ static id getCoingeckoOracleId()
+ {
+ using namespace Ch;
+ return id(c, o, i, n, g, e, c, k, o);
+ }
};
diff --git a/src/platform/concurrency.h b/src/platform/concurrency.h
index 4cdf5d3df..9bdeb8ff7 100644
--- a/src/platform/concurrency.h
+++ b/src/platform/concurrency.h
@@ -43,6 +43,23 @@ class BusyWaitingTracker
// Release lock
#define RELEASE(lock) lock = 0
+// Create an object of this class to lock until the end of the life-time of this object.
+// Usually used on stack for making sure that the lock is released, no matter which way the function is left.
+struct LockGuard
+{
+ LockGuard(volatile char& lock) : _lock(lock)
+ {
+ ACQUIRE(_lock);
+ }
+
+ ~LockGuard()
+ {
+ RELEASE(_lock);
+ }
+
+ volatile char& _lock;
+};
+
#ifdef NDEBUG
diff --git a/src/platform/m256.h b/src/platform/m256.h
index 09a57af9e..fa78adba4 100644
--- a/src/platform/m256.h
+++ b/src/platform/m256.h
@@ -62,6 +62,17 @@ union m256i
_mm256_set_epi64x(ull3, ull2, ull1, ull0));
}
+ m256i(
+ char c0, char c1, char c2, char c3, char c4, char c5 = 0, char c6 = 0, char c7 = 0, char c8 = 0, char c9 = 0,
+ char c10 = 0, char c11 = 0, char c12 = 0, char c13 = 0, char c14 = 0, char c15 = 0, char c16 = 0, char c17 = 0,
+ char c18 = 0, char c19 = 0, char c20 = 0, char c21 = 0, char c22 = 0, char c23 = 0, char c24 = 0, char c25 = 0,
+ char c26 = 0, char c27 = 0, char c28 = 0, char c29 = 0, char c30 = 0, char c31 = 0)
+ {
+ _mm256_storeu_si256(reinterpret_cast<__m256i*>(this),
+ _mm256_set_epi8(c31, c30, c29, c28, c27, c26, c25, c24, c23, c22, c21, c20, c19, c18, c17, c16, c15, c14, c13, c12,
+ c11, c10, c9, c8, c7, c6, c5, c4, c3, c2, c1, c0));
+ }
+
m256i(const unsigned char value[32])
{
_mm256_storeu_si256(reinterpret_cast<__m256i*>(this),
diff --git a/src/qubic.cpp b/src/qubic.cpp
index 6fffe1351..b7d63f5d9 100644
--- a/src/qubic.cpp
+++ b/src/qubic.cpp
@@ -58,6 +58,8 @@
#include "logging/net_msg_impl.h"
#include "ticking/ticking.h"
+#include "ticking/tick_storage.h"
+#include "ticking/pending_txs_pool.h"
#include "contract_core/qpi_ticking_impl.h"
#include "vote_counter.h"
#include "ticking/execution_fee_report_collector.h"
@@ -136,7 +138,6 @@ static unsigned short numberOfOwnComputorIndices;
static unsigned short ownComputorIndices[computorSeedsCount];
static unsigned short ownComputorIndicesMapping[computorSeedsCount];
-static TickStorage ts;
static VoteCounter voteCounter;
static ExecutionFeeReportCollector executionFeeReportCollector;
static TickData nextTickData;
@@ -157,7 +158,8 @@ static unsigned int contractProcessorPhase;
static const Transaction* contractProcessorTransaction = 0; // does not have signature in some cases, see notifyContractOfIncomingTransfer()
static int contractProcessorTransactionMoneyflew = 0;
static unsigned char contractProcessorPostIncomingTransferType = 0;
-static const UserProcedureNotification* contractProcessorUserProcedureNotification = 0;
+static const UserProcedureRegistry::UserProcedureData* contractProcessorUserProcedureNotificationProc = 0;
+static const void* contractProcessorUserProcedureNotificationInput = 0;
static EFI_EVENT contractProcessorEvent;
static m256i contractStateDigests[MAX_NUMBER_OF_CONTRACTS * 2 - 1];
const unsigned long long contractStateDigestsSizeInBytes = sizeof(contractStateDigests);
@@ -995,6 +997,14 @@ static void processBroadcastTransaction(Peer* peer, RequestResponseHeader* heade
}
}
ts.tickData.releaseLock();
+
+ // shortcut: oracle reply reveal transactions are analyzed immediately after receiving them (before execution of the tx),
+ // in order to minimize the number of reveal transaction (one per oracle query is enough, so no reveal tx is generated
+ // after one has been seen)
+ if (isZero(request->destinationPublicKey) && request->inputType == OracleReplyRevealTransactionPrefix::transactionType())
+ {
+ oracleEngine.announceExpectedRevealTransaction((OracleReplyRevealTransactionPrefix*)request);
+ }
}
}
}
@@ -2192,11 +2202,13 @@ static void requestProcessor(void* ProcedureArgument)
processRequestAssets(peer, header);
}
break;
+
case RequestCustomMiningSolutionVerification::type():
{
processRequestedCustomMiningSolutionVerificationRequest(peer, header);
}
break;
+
case RequestCustomMiningData::type():
{
processCustomMiningDataRequest(peer, processorNumber, header);
@@ -2213,6 +2225,13 @@ static void requestProcessor(void* ProcedureArgument)
{
processOracleMachineReply(peer, header);
}
+ break;
+
+ case RequestOracleData::type():
+ {
+ oracleEngine.processRequestOracleData(peer, header);
+ }
+ break;
#if ADDON_TX_STATUS_REQUEST
/* qli: process RequestTxStatus message */
@@ -2431,15 +2450,17 @@ static void contractProcessor(void*)
case USER_PROCEDURE_NOTIFICATION_CALL:
{
- const auto* notification = contractProcessorUserProcedureNotification;
- ASSERT(notification && notification->procedure && notification->inputPtr);
+ const auto* notification = contractProcessorUserProcedureNotificationProc;
+ ASSERT(notification && notification->procedure);
ASSERT(notification->inputSize <= MAX_INPUT_SIZE);
ASSERT(notification->localsSize <= MAX_SIZE_OF_CONTRACT_LOCALS);
+ ASSERT(contractProcessorUserProcedureNotificationInput);
QpiContextUserProcedureNotificationCall qpiContext(*notification);
- qpiContext.call();
+ qpiContext.call(contractProcessorUserProcedureNotificationInput);
- contractProcessorUserProcedureNotification = 0;
+ contractProcessorUserProcedureNotificationProc = 0;
+ contractProcessorUserProcedureNotificationInput = 0;
}
break;
}
@@ -2751,28 +2772,17 @@ static void processTickTransactionSolution(const MiningSolutionTransaction* tran
}
}
-
-static void processTickTransactionOracleReplyReveal(const OracleReplyRevealTransactionPrefix* transaction)
+static void processTickTransaction(const Transaction* transaction, unsigned int transactionIndex, unsigned long long processorNumber)
{
PROFILE_SCOPE();
ASSERT(nextTickData.epoch == system.epoch);
ASSERT(transaction != nullptr);
ASSERT(transaction->checkValidity());
- ASSERT(isZero(transaction->destinationPublicKey));
ASSERT(transaction->tick == system.tick);
- // TODO
-}
-
-static void processTickTransaction(const Transaction* transaction, const m256i& transactionDigest, const m256i& dataLock, unsigned long long processorNumber)
-{
- PROFILE_SCOPE();
-
- ASSERT(nextTickData.epoch == system.epoch);
- ASSERT(transaction != nullptr);
- ASSERT(transaction->checkValidity());
- ASSERT(transaction->tick == system.tick);
+ const m256i& transactionDigest = nextTickData.transactionDigests[transactionIndex];
+ const m256i& dataLock = nextTickData.timelock;
// Record the tx with digest
ts.transactionsDigestAccess.acquireLock();
@@ -2852,18 +2862,13 @@ static void processTickTransaction(const Transaction* transaction, const m256i&
case OracleReplyCommitTransactionPrefix::transactionType():
{
- oracleEngine.processTransactionOracleReplyCommit((OracleReplyCommitTransactionPrefix*)transaction);
+ oracleEngine.processOracleReplyCommitTransaction((OracleReplyCommitTransactionPrefix*)transaction);
}
break;
case OracleReplyRevealTransactionPrefix::transactionType():
{
- if (computorIndex(transaction->sourcePublicKey) >= 0
- && transaction->inputSize >= OracleReplyRevealTransactionPrefix::minInputSize())
- {
- // TODO: fix size check by defining minInputSize
- processTickTransactionOracleReplyReveal((OracleReplyRevealTransactionPrefix*)transaction);
- }
+ oracleEngine.processOracleReplyRevealTransaction((OracleReplyRevealTransactionPrefix*)transaction, transactionIndex);
}
break;
@@ -3246,7 +3251,7 @@ static void processTick(unsigned long long processorNumber)
{
Transaction* transaction = ts.tickTransactions(tsCurrentTickTransactionOffsets[transactionIndex]);
logger.registerNewTx(transaction->tick, transactionIndex);
- processTickTransaction(transaction, nextTickData.transactionDigests[transactionIndex], nextTickData.timelock, processorNumber);
+ processTickTransaction(transaction, transactionIndex, processorNumber);
}
else
{
@@ -3260,6 +3265,28 @@ static void processTick(unsigned long long processorNumber)
PROFILE_SCOPE_END();
}
+ // Check for oracle query timeouts (may schedule notification)
+ oracleEngine.processTimeouts();
+
+ // Notify contracts about successfully obtained oracle replies and about errors (using contract processor)
+ const OracleNotificationData* oracleNotification = oracleEngine.getNotification();
+ while (oracleNotification)
+ {
+ PROFILE_NAMED_SCOPE("processTick(): run oracle contract notification");
+ logger.registerNewTx(system.tick, logger.SC_NOTIFICATION_TX);
+ contractProcessorUserProcedureNotificationProc = userProcedureRegistry->get(oracleNotification->procedureId);
+ contractProcessorUserProcedureNotificationInput = oracleNotification->inputBuffer;
+ ASSERT(contractProcessorUserProcedureNotificationProc);
+ ASSERT(contractProcessorUserProcedureNotificationProc->contractIndex == oracleNotification->contractIndex);
+ ASSERT(contractProcessorUserProcedureNotificationProc->inputSize == oracleNotification->inputSize);
+ ASSERT(contractProcessorUserProcedureNotificationInput);
+ contractProcessorPhase = USER_PROCEDURE_NOTIFICATION_CALL;
+ contractProcessorState = 1;
+ WAIT_WHILE(contractProcessorState);
+
+ oracleNotification = oracleEngine.getNotification();
+ }
+
// The last executionFeeReport for the previous phase is published by comp (0-indexed) in the last tick t1 of the
// previous phase (t1 % NUMBER_OF_COMPUTORS == NUMBER_OF_COMPUTORS - 1) for inclusion in tick t2 = t1 + TICK_TRANSACTIONS_PUBLICATION_OFFSET.
// Tick t2 corresponds to tick of the current phase.
@@ -3474,6 +3501,55 @@ static void processTick(unsigned long long processorNumber)
}
}
+ // Publish oracle reply commit and reveal transactions (uses reorgBuffer for constructing packets)
+ if (isMainMode())
+ {
+ const auto txTick = system.tick + TICK_TRANSACTIONS_PUBLICATION_OFFSET;
+ unsigned char digest[32];
+ {
+ PROFILE_NAMED_SCOPE("processTick(): broadcast oracle reply transactions");
+ auto* tx = (OracleReplyCommitTransactionPrefix*)reorgBuffer;
+ for (unsigned int i = 0; i < numberOfOwnComputorIndices; i++)
+ {
+ const auto ownCompIdx = ownComputorIndicesMapping[i];
+ const auto overallCompIdx = ownComputorIndices[i];
+ unsigned int retCode = 0;
+ do
+ {
+ // create reply commit transaction in tx (without signature), returning:
+ // - 0 if no tx was created (no need to send reply commits)
+ // - UINT32_MAX if we all pending reply commits fitted into this one tx
+ // - otherwise, an index value that has to be passed to the next call for building another tx
+ retCode = oracleEngine.getReplyCommitTransaction(tx, overallCompIdx, ownCompIdx, txTick, retCode);
+ if (!retCode)
+ break;
+
+ // sign and broadcast tx
+ KangarooTwelve(tx, sizeof(Transaction) + tx->inputSize, digest, sizeof(digest));
+ sign(computorSubseeds[i].m256i_u8, computorPublicKeys[i].m256i_u8, digest, tx->signaturePtr());
+ enqueueResponse(NULL, tx->totalSize(), BROADCAST_TRANSACTION, 0, tx);
+ }
+ while (retCode != UINT32_MAX);
+ }
+ }
+
+ {
+ PROFILE_NAMED_SCOPE("processTick(): broadcast oracle reveal transactions");
+ auto* tx = (OracleReplyRevealTransactionPrefix*)reorgBuffer;
+ // create reply reveal transaction in tx (without signature), returning:
+ // - 0 if no tx was created (no need to send reply commits)
+ // - otherwise, an index value that has to be passed to the next call for building another tx
+ unsigned int retCode = 0;
+ while ((retCode = oracleEngine.getReplyRevealTransaction(tx, 0, txTick, retCode)) != 0)
+ {
+ // sign and broadcast tx
+ KangarooTwelve(tx, sizeof(Transaction) + tx->inputSize, digest, sizeof(digest));
+ sign(computorSubseeds[0].m256i_u8, computorPublicKeys[0].m256i_u8, digest, tx->signaturePtr());
+ enqueueResponse(NULL, tx->totalSize(), BROADCAST_TRANSACTION, 0, tx);
+ }
+ }
+ }
+
if (isMainMode())
{
// Publish solutions that were sent via BroadcastMessage as MiningSolutionTransaction
@@ -5770,7 +5846,7 @@ static bool initialize()
}
}
- if (!oracleEngine.init())
+ if (!oracleEngine.init(computorPublicKeys))
return false;
#ifdef INCLUDE_CONTRACT_TEST_EXAMPLES
diff --git a/src/ticking/tick_storage.h b/src/ticking/tick_storage.h
index 3c9116e87..7dde48944 100644
--- a/src/ticking/tick_storage.h
+++ b/src/ticking/tick_storage.h
@@ -7,6 +7,7 @@
#include "platform/concurrency.h"
#include "platform/console_logging.h"
#include "platform/debugging.h"
+#include "platform/global_var.h"
#include "public_settings.h"
@@ -1037,3 +1038,5 @@ class TickStorage
}
} transactionsDigestAccess;
};
+
+GLOBAL_VAR_DECL TickStorage ts;
diff --git a/src/ticking/ticking.h b/src/ticking/ticking.h
index 4bbb0d61a..97396bed6 100644
--- a/src/ticking/ticking.h
+++ b/src/ticking/ticking.h
@@ -5,9 +5,6 @@
#include "network_messages/tick.h"
-#include "ticking/tick_storage.h"
-#include "ticking/pending_txs_pool.h"
-
#include "private_settings.h"
GLOBAL_VAR_DECL Tick etalonTick;
diff --git a/test/common_def.cpp b/test/common_def.cpp
index adba975fd..4c835edc7 100644
--- a/test/common_def.cpp
+++ b/test/common_def.cpp
@@ -3,6 +3,7 @@
#include "contract_testing.h"
#include "logging_test.h"
+#include "oracle_testing.h"
#include "platform/concurrency_impl.h"
#include "platform/profiling.h"
diff --git a/test/contract_testex.cpp b/test/contract_testex.cpp
index e56e85460..8bd206441 100644
--- a/test/contract_testex.cpp
+++ b/test/contract_testex.cpp
@@ -4,6 +4,7 @@
#include
#include "contract_testing.h"
+#include "oracle_testing.h"
static const id TESTEXA_CONTRACT_ID(TESTEXA_CONTRACT_INDEX, 0, 0, 0);
static const id TESTEXB_CONTRACT_ID(TESTEXB_CONTRACT_INDEX, 0, 0, 0);
@@ -144,7 +145,7 @@ class ContractTestingTestEx : protected ContractTesting
INIT_CONTRACT(QX);
callSystemProcedure(QX_CONTRACT_INDEX, INITIALIZE);
- EXPECT_TRUE(oracleEngine.init());
+ EXPECT_TRUE(oracleEngine.init(computorPublicKeys));
checkContractExecCleanup();
@@ -2101,48 +2102,6 @@ TEST(ContractTestEx, SystemCallbacksWithNegativeFeeReserve)
EXPECT_LT(getContractFeeReserve(TESTEXC_CONTRACT_INDEX), 0);
}
-static union
-{
- RequestResponseHeader header;
-
- struct
- {
- RequestResponseHeader header;
- OracleMachineQuery queryMetadata;
- unsigned char queryData[MAX_ORACLE_QUERY_SIZE];
- } omQuery;
-} enqueuedNetworkMessage;
-
-template
-void checkNetworkMessageOracleMachineQuery(uint64 expectedOracleQueryId, id expectedOracle, uint32 expectedTimeout)
-{
- EXPECT_EQ(enqueuedNetworkMessage.header.type(), OracleMachineQuery::type());
- EXPECT_GT(enqueuedNetworkMessage.header.size(), sizeof(RequestResponseHeader) + sizeof(OracleMachineQuery));
- uint32 queryDataSize = enqueuedNetworkMessage.header.size() - sizeof(RequestResponseHeader) - sizeof(OracleMachineQuery);
- EXPECT_LE(queryDataSize, (uint32)MAX_ORACLE_QUERY_SIZE);
- EXPECT_EQ(queryDataSize, sizeof(typename OracleInterface::OracleQuery));
- EXPECT_EQ(enqueuedNetworkMessage.omQuery.queryMetadata.oracleInterfaceIndex, OracleInterface::oracleInterfaceIndex);
- EXPECT_EQ(enqueuedNetworkMessage.omQuery.queryMetadata.oracleQueryId, expectedOracleQueryId);
- EXPECT_EQ(enqueuedNetworkMessage.omQuery.queryMetadata.timeoutInMilliseconds, expectedTimeout);
- const auto* q = (const OracleInterface::OracleQuery*)enqueuedNetworkMessage.omQuery.queryData;
- EXPECT_EQ(q->oracle, expectedOracle);
-}
-
-static void enqueueResponse(Peer* peer, unsigned int dataSize, unsigned char type, unsigned int dejavu, const void* data)
-{
- EXPECT_EQ(peer, (Peer*)0x1);
- EXPECT_LE(dataSize, sizeof(OracleMachineQuery) + MAX_ORACLE_QUERY_SIZE);
- EXPECT_TRUE(enqueuedNetworkMessage.header.checkAndSetSize(sizeof(RequestResponseHeader) + dataSize));
- enqueuedNetworkMessage.header.setType(type);
- enqueuedNetworkMessage.header.setDejavu(dejavu);
- copyMem(&enqueuedNetworkMessage.omQuery.queryMetadata, data, dataSize);
-}
-
-uint64 getContractOracleQueryId(uint32 tick, uint16 indexInTick)
-{
- return ((uint64)tick << 31) | (indexInTick + NUMBER_OF_TRANSACTIONS_PER_TICK);
-}
-
TEST(ContractTestEx, OracleQuery)
{
ContractTestingTestEx test;
@@ -2165,7 +2124,7 @@ TEST(ContractTestEx, OracleQuery)
expectedOracleQueryId = getContractOracleQueryId(system.tick, 2);
test.endTick();
++system.tick;
- checkNetworkMessageOracleMachineQuery(expectedOracleQueryId, id(0, 0, 0, 0), 20000);
+ checkNetworkMessageOracleMachineQuery(expectedOracleQueryId, OI::Price::getMockOracleId(), 20000);
expectedOracleQueryId = getContractOracleQueryId(system.tick, 0);
EXPECT_EQ(test.queryPriceOracle(USER1, id(2, 3, 4, 5), 13), expectedOracleQueryId);
diff --git a/test/oracle_engine.cpp b/test/oracle_engine.cpp
new file mode 100644
index 000000000..e3406402b
--- /dev/null
+++ b/test/oracle_engine.cpp
@@ -0,0 +1,467 @@
+#define NO_UEFI
+
+#include "oracle_testing.h"
+
+
+struct OracleEngineTest : public LoggingTest
+{
+ OracleEngineTest()
+ {
+ EXPECT_TRUE(initCommonBuffers());
+ EXPECT_TRUE(initSpecialEntities());
+ EXPECT_TRUE(initContractExec());
+ EXPECT_TRUE(ts.init());
+
+ // init computors
+ for (int computorIndex = 0; computorIndex < NUMBER_OF_COMPUTORS; computorIndex++)
+ {
+ broadcastedComputors.computors.publicKeys[computorIndex] = m256i(computorIndex * 2, 42, 13, 1337);
+ }
+
+ // setup tick and time
+ system.tick = 1000;
+ etalonTick.year = 25;
+ etalonTick.month = 12;
+ etalonTick.day = 15;
+ etalonTick.hour = 16;
+ etalonTick.minute = 51;
+ etalonTick.second = 12;
+ ts.beginEpoch(system.tick);
+ }
+
+ ~OracleEngineTest()
+ {
+ deinitCommonBuffers();
+ deinitContractExec();
+ ts.deinit();
+ }
+};
+
+template
+struct OracleEngineWithInitAndDeinit : public OracleEngine
+{
+ OracleEngineWithInitAndDeinit(const m256i* ownComputorPublicKeys)
+ {
+ this->init(ownComputorPublicKeys);
+ }
+
+ ~OracleEngineWithInitAndDeinit()
+ {
+ this->deinit();
+ }
+
+ void checkPendingState(int64_t queryId, uint16_t totalCommitTxExecuted, uint16_t ownCommitTxExecuted, uint8_t expectedStatus) const
+ {
+ uint32_t queryIndex;
+ EXPECT_TRUE(this->queryIdToIndex->get(queryId, queryIndex));
+ EXPECT_LT(queryIndex, this->oracleQueryCount);
+ const OracleQueryMetadata& oqm = this->queries[queryIndex];
+ EXPECT_EQ(oqm.status, expectedStatus);
+ EXPECT_TRUE(oqm.status == ORACLE_QUERY_STATUS_PENDING || oqm.status == ORACLE_QUERY_STATUS_COMMITTED);
+ const OracleReplyState& replyState = this->replyStates[oqm.statusVar.pending.replyStateIndex];
+ EXPECT_EQ((int)totalCommitTxExecuted, (int)replyState.totalCommits);
+ EXPECT_EQ((int)ownCommitTxExecuted, (int)replyState.ownReplyCommitExecCount);
+ }
+
+ void checkStatus(int64_t queryId, uint8_t expectedStatus) const
+ {
+ uint32_t queryIndex;
+ EXPECT_TRUE(this->queryIdToIndex->get(queryId, queryIndex));
+ EXPECT_LT(queryIndex, this->oracleQueryCount);
+ const OracleQueryMetadata& oqm = this->queries[queryIndex];
+ EXPECT_EQ(oqm.status, expectedStatus);
+ }
+};
+
+static void dummyNotificationProc(const QPI::QpiContextProcedureCall&, void* state, void* input, void* output, void* locals)
+{
+}
+
+TEST(OracleEngine, ContractQuerySuccess)
+{
+ OracleEngineTest test;
+
+ // simulate three nodes: one with 400 computor IDs, one with 200, and one with 76
+ const m256i* allCompPubKeys = broadcastedComputors.computors.publicKeys;
+ OracleEngineWithInitAndDeinit<400> oracleEngine1(allCompPubKeys);
+ OracleEngineWithInitAndDeinit<200> oracleEngine2(allCompPubKeys + 400);
+ OracleEngineWithInitAndDeinit<76> oracleEngine3(allCompPubKeys + 600);
+
+ OI::Price::OracleQuery priceQuery;
+ priceQuery.oracle = m256i(1, 2, 3, 4);
+ priceQuery.currency1 = m256i(2, 3, 4, 5);
+ priceQuery.currency2 = m256i(3, 4, 5, 6);
+ priceQuery.timestamp = QPI::DateAndTime::now();
+ QPI::uint32 interfaceIndex = 0;
+ QPI::uint16 contractIndex = 1;
+ QPI::uint32 timeout = 30000;
+ const QPI::uint32 notificationProcId = 12345;
+ EXPECT_TRUE(userProcedureRegistry->add(notificationProcId, { dummyNotificationProc, 1, 128, 128, 1 }));
+
+ //-------------------------------------------------------------------------
+ // start contract query / check message to OM node
+ QPI::sint64 queryId = oracleEngine1.startContractQuery(contractIndex, interfaceIndex, &priceQuery, sizeof(priceQuery), timeout, notificationProcId);
+ EXPECT_EQ(queryId, getContractOracleQueryId(system.tick, 0));
+ checkNetworkMessageOracleMachineQuery(queryId, priceQuery.oracle, timeout);
+ EXPECT_EQ(queryId, oracleEngine2.startContractQuery(contractIndex, interfaceIndex, &priceQuery, sizeof(priceQuery), timeout, notificationProcId));
+ EXPECT_EQ(queryId, oracleEngine3.startContractQuery(contractIndex, interfaceIndex, &priceQuery, sizeof(priceQuery), timeout, notificationProcId));
+
+ //-------------------------------------------------------------------------
+ // get query contract data
+ OI::Price::OracleQuery priceQueryReturned;
+ EXPECT_TRUE(oracleEngine1.getOracleQuery(queryId, &priceQueryReturned, sizeof(priceQueryReturned)));
+ EXPECT_EQ(memcmp(&priceQueryReturned, &priceQuery, sizeof(priceQuery)), 0);
+
+ //-------------------------------------------------------------------------
+ // process simulated reply from OM node
+ struct
+ {
+ OracleMachineReply metatdata;
+ OI::Price::OracleReply data;
+ } priceOracleMachineReply;
+
+ priceOracleMachineReply.metatdata.oracleMachineErrorFlags = 0;
+ priceOracleMachineReply.metatdata.oracleQueryId = queryId;
+ priceOracleMachineReply.data.numerator = 1234;
+ priceOracleMachineReply.data.denominator = 1;
+
+ oracleEngine1.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+ oracleEngine2.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+ oracleEngine3.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+
+ // duplicate from other node
+ oracleEngine1.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+
+ // other value from other node
+ priceOracleMachineReply.data.numerator = 1233;
+ oracleEngine1.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+
+ //-------------------------------------------------------------------------
+ // create reply commit tx (with local computor index 0 / global computor index 0)
+ uint8_t txBuffer[MAX_TRANSACTION_SIZE];
+ auto* replyCommitTx = (OracleReplyCommitTransactionPrefix*)txBuffer;
+ EXPECT_EQ(oracleEngine1.getReplyCommitTransaction(txBuffer, 0, 0, system.tick + 3, 0), UINT32_MAX);
+ {
+ EXPECT_EQ((int)replyCommitTx->inputType, (int)OracleReplyCommitTransactionPrefix::transactionType());
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[0]);
+ EXPECT_TRUE(isZero(replyCommitTx->destinationPublicKey));
+ EXPECT_EQ(replyCommitTx->tick, system.tick + 3);
+ EXPECT_EQ((int)replyCommitTx->inputSize, (int)sizeof(OracleReplyCommitTransactionItem));
+ }
+
+ // second call in the same tick: no commits for tx
+ EXPECT_EQ(oracleEngine1.getReplyCommitTransaction(txBuffer, 0, 0, system.tick + 3, 0), 0);
+
+ // process commit tx
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+
+ // no reveal yet
+ EXPECT_EQ(oracleEngine1.getReplyRevealTransaction(txBuffer, 0, system.tick + 3, 0), 0);
+
+ // no notifications
+ EXPECT_EQ(oracleEngine1.getNotification(), nullptr);
+
+ //-------------------------------------------------------------------------
+ // create and process enough reply commit tx to trigger reval tx
+
+ // create tx of node 3 computers and process in all nodes
+ for (int i = 600; i < 676; ++i)
+ {
+ EXPECT_EQ(oracleEngine3.getReplyCommitTransaction(txBuffer, i, i - 600, system.tick + 3, 0), UINT32_MAX);
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[i]);
+ const int txFromNode3 = i - 600;
+ oracleEngine1.checkPendingState(queryId, txFromNode3 + 1, 1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine1.checkPendingState(queryId, txFromNode3 + 2, 1, ORACLE_QUERY_STATUS_PENDING);
+ oracleEngine2.checkPendingState(queryId, txFromNode3 + 1, 0, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine2.checkPendingState(queryId, txFromNode3 + 2, 0, ORACLE_QUERY_STATUS_PENDING);
+ oracleEngine3.checkPendingState(queryId, txFromNode3 + 1, txFromNode3 + 0, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine3.checkPendingState(queryId, txFromNode3 + 2, txFromNode3 + 1, ORACLE_QUERY_STATUS_PENDING);
+ }
+
+ // create tx of node 2 computers and process in all nodes
+ for (int i = 400; i < 600; ++i)
+ {
+ EXPECT_EQ(oracleEngine2.getReplyCommitTransaction(txBuffer, i, i - 400, system.tick + 3, 0), UINT32_MAX);
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[i]);
+ const int txFromNode2 = i - 400;
+ oracleEngine1.checkPendingState(queryId, txFromNode2 + 77, 1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine1.checkPendingState(queryId, txFromNode2 + 78, 1, ORACLE_QUERY_STATUS_PENDING);
+ oracleEngine2.checkPendingState(queryId, txFromNode2 + 77, txFromNode2 + 0, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine2.checkPendingState(queryId, txFromNode2 + 78, txFromNode2 + 1, ORACLE_QUERY_STATUS_PENDING);
+ oracleEngine3.checkPendingState(queryId, txFromNode2 + 77, 76, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine3.checkPendingState(queryId, txFromNode2 + 78, 76, ORACLE_QUERY_STATUS_PENDING);
+ }
+
+ // create tx of node 1 computers and process in all nodes
+ for (int i = 1; i < 400; ++i)
+ {
+ bool expectStatusCommitted = (i + 276) >= 451;
+ EXPECT_EQ(oracleEngine1.getReplyCommitTransaction(txBuffer, i, i, system.tick + 3, 0), ((expectStatusCommitted) ? 0 : UINT32_MAX));
+ if (!expectStatusCommitted)
+ {
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[i]);
+ const int txFromNode1 = i;
+ uint8_t newStatus = (txFromNode1 + 276 < 450) ? ORACLE_QUERY_STATUS_PENDING : ORACLE_QUERY_STATUS_COMMITTED;
+ oracleEngine1.checkPendingState(queryId, txFromNode1 + 276, txFromNode1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine1.checkPendingState(queryId, txFromNode1 + 277, txFromNode1 + 1, newStatus);
+ oracleEngine2.checkPendingState(queryId, txFromNode1 + 276, 200, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine2.checkPendingState(queryId, txFromNode1 + 277, 200, newStatus);
+ oracleEngine3.checkPendingState(queryId, txFromNode1 + 276, 76, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine3.checkPendingState(queryId, txFromNode1 + 277, 76, newStatus);
+ }
+ else
+ {
+ oracleEngine1.checkPendingState(queryId, 451, 175, ORACLE_QUERY_STATUS_COMMITTED);
+ oracleEngine2.checkPendingState(queryId, 451, 200, ORACLE_QUERY_STATUS_COMMITTED);
+ oracleEngine3.checkPendingState(queryId, 451, 76, ORACLE_QUERY_STATUS_COMMITTED);
+ }
+ }
+
+ //-------------------------------------------------------------------------
+ // reply reveal tx
+
+ // success for one tx
+ EXPECT_EQ(oracleEngine1.getReplyRevealTransaction(txBuffer, 0, system.tick + 3, 0), 1);
+ EXPECT_EQ(oracleEngine1.getReplyRevealTransaction(txBuffer, 0, system.tick + 3, 1), 0);
+
+ // second call does not provide the same tx again
+ EXPECT_EQ(oracleEngine1.getReplyRevealTransaction(txBuffer, 0, system.tick + 3, 0), 0);
+
+ system.tick += 3;
+ auto* replyRevealTx = (OracleReplyRevealTransactionPrefix*)txBuffer;
+ const unsigned int txIndex = 10;
+ addOracleTransactionToTickStorage(replyRevealTx, txIndex);
+ oracleEngine1.processOracleReplyRevealTransaction(replyRevealTx, txIndex);
+
+ //-------------------------------------------------------------------------
+ // notifications
+ const OracleNotificationData* notification = oracleEngine1.getNotification();
+ EXPECT_NE(notification, nullptr);
+ EXPECT_EQ((int)notification->contractIndex, (int)contractIndex);
+ EXPECT_EQ(notification->procedureId, notificationProcId);
+ EXPECT_EQ((int)notification->inputSize, sizeof(OracleNotificationInput));
+ const auto* notificationInput = (const OracleNotificationInput*) & notification->inputBuffer;
+ EXPECT_EQ(notificationInput->queryId, replyRevealTx->queryId);
+ EXPECT_EQ(notificationInput->status, ORACLE_QUERY_STATUS_SUCCESS);
+ EXPECT_EQ(notificationInput->subscriptionId, 0);
+ EXPECT_EQ(notificationInput->reply.numerator, 1234);
+ EXPECT_EQ(notificationInput->reply.denominator, 1);
+
+ // no additional notifications
+ EXPECT_EQ(oracleEngine1.getNotification(), nullptr);
+}
+
+TEST(OracleEngine, ContractQueryUnresolvable)
+{
+ OracleEngineTest test;
+
+ // simulate three nodes: two with 200 computor IDs each, one with 276 IDs
+ const m256i* allCompPubKeys = broadcastedComputors.computors.publicKeys;
+ OracleEngineWithInitAndDeinit<200> oracleEngine1(allCompPubKeys);
+ OracleEngineWithInitAndDeinit<200> oracleEngine2(allCompPubKeys + 200);
+ OracleEngineWithInitAndDeinit<276> oracleEngine3(allCompPubKeys + 400);
+
+
+ OI::Price::OracleQuery priceQuery;
+ priceQuery.oracle = m256i(10, 20, 30, 40);
+ priceQuery.currency1 = m256i(20, 30, 40, 50);
+ priceQuery.currency2 = m256i(30, 40, 50, 60);
+ priceQuery.timestamp = QPI::DateAndTime::now();
+ QPI::uint32 interfaceIndex = 0;
+ QPI::uint16 contractIndex = 2;
+ QPI::uint32 timeout = 120000;
+ const QPI::uint32 notificationProcId = 12345;
+ EXPECT_TRUE(userProcedureRegistry->add(notificationProcId, { dummyNotificationProc, 1, 1024, 128, 1 }));
+
+ //-------------------------------------------------------------------------
+ // start contract query / check message to OM node
+ QPI::sint64 queryId = oracleEngine1.startContractQuery(contractIndex, interfaceIndex, &priceQuery, sizeof(priceQuery), timeout, notificationProcId);
+ EXPECT_EQ(queryId, getContractOracleQueryId(system.tick, 0));
+ EXPECT_EQ(queryId, oracleEngine2.startContractQuery(contractIndex, interfaceIndex, &priceQuery, sizeof(priceQuery), timeout, notificationProcId));
+ EXPECT_EQ(queryId, oracleEngine3.startContractQuery(contractIndex, interfaceIndex, &priceQuery, sizeof(priceQuery), timeout, notificationProcId));
+ checkNetworkMessageOracleMachineQuery(queryId, priceQuery.oracle, timeout);
+
+ //-------------------------------------------------------------------------
+ // get query contract data
+ OI::Price::OracleQuery priceQueryReturned;
+ EXPECT_TRUE(oracleEngine1.getOracleQuery(queryId, &priceQueryReturned, sizeof(priceQueryReturned)));
+ EXPECT_EQ(memcmp(&priceQueryReturned, &priceQuery, sizeof(priceQuery)), 0);
+
+ //-------------------------------------------------------------------------
+ // process simulated reply from OM nodes
+ struct
+ {
+ OracleMachineReply metatdata;
+ OI::Price::OracleReply data;
+ } priceOracleMachineReply;
+
+ // reply received/committed by node 1 and 2
+ priceOracleMachineReply.metatdata.oracleMachineErrorFlags = 0;
+ priceOracleMachineReply.metatdata.oracleQueryId = queryId;
+ priceOracleMachineReply.data.numerator = 1234;
+ priceOracleMachineReply.data.denominator = 1;
+ oracleEngine1.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+ oracleEngine2.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+
+ // reply received/committed by node 1 and 2
+ priceOracleMachineReply.data.numerator = 1233;
+ priceOracleMachineReply.data.denominator = 1;
+ oracleEngine3.processOracleMachineReply(&priceOracleMachineReply.metatdata, sizeof(priceOracleMachineReply));
+
+
+ //-------------------------------------------------------------------------
+ // create and process reply commits of node 3 computers and process in all nodes
+ uint8_t txBuffer[MAX_TRANSACTION_SIZE];
+ auto* replyCommitTx = (OracleReplyCommitTransactionPrefix*)txBuffer;
+ for (int ownCompIdx = 0; ownCompIdx < 200; ++ownCompIdx)
+ {
+ int allCompIdx = ownCompIdx;
+ EXPECT_EQ(oracleEngine1.getReplyCommitTransaction(txBuffer, allCompIdx, ownCompIdx, system.tick + 3, 0), UINT32_MAX);
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[allCompIdx]);
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine1.checkPendingState(queryId, 3 * ownCompIdx + 1, ownCompIdx + 1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine2.checkPendingState(queryId, 3 * ownCompIdx + 1, ownCompIdx, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine3.checkPendingState(queryId, 3 * ownCompIdx + 1, ownCompIdx, ORACLE_QUERY_STATUS_PENDING);
+
+ allCompIdx = ownCompIdx + 200;
+ EXPECT_EQ(oracleEngine2.getReplyCommitTransaction(txBuffer, allCompIdx, ownCompIdx, system.tick + 3, 0), UINT32_MAX);
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[allCompIdx]);
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine1.checkPendingState(queryId, 3 * ownCompIdx + 2, ownCompIdx + 1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine2.checkPendingState(queryId, 3 * ownCompIdx + 2, ownCompIdx + 1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine3.checkPendingState(queryId, 3 * ownCompIdx + 2, ownCompIdx, ORACLE_QUERY_STATUS_PENDING);
+
+ allCompIdx = ownCompIdx + 400;
+ EXPECT_EQ(oracleEngine3.getReplyCommitTransaction(txBuffer, allCompIdx, ownCompIdx, system.tick + 3, 0), UINT32_MAX);
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[allCompIdx]);
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine1.checkPendingState(queryId, 3 * ownCompIdx + 3, ownCompIdx + 1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine2.checkPendingState(queryId, 3 * ownCompIdx + 3, ownCompIdx + 1, ORACLE_QUERY_STATUS_PENDING);
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+ oracleEngine3.checkPendingState(queryId, 3 * ownCompIdx + 3, ownCompIdx + 1, ORACLE_QUERY_STATUS_PENDING);
+ }
+
+ // create/process transcations that contradict with majority digest and turn status into unresolvable
+ for (int allCompIdx = 600; allCompIdx < 676; ++allCompIdx)
+ {
+ int ownCompIdx = allCompIdx - 400;
+ int unknownVotes = 676 - allCompIdx;
+ bool moreTxExpected = (unknownVotes > 450 - 400);
+ EXPECT_EQ(oracleEngine3.getReplyCommitTransaction(txBuffer, allCompIdx, ownCompIdx, system.tick + 3, 0), moreTxExpected ? UINT32_MAX : 0);
+ if (moreTxExpected)
+ {
+ EXPECT_EQ(replyCommitTx->sourcePublicKey, allCompPubKeys[allCompIdx]);
+
+ EXPECT_TRUE(oracleEngine1.processOracleReplyCommitTransaction(replyCommitTx));
+ EXPECT_TRUE(oracleEngine2.processOracleReplyCommitTransaction(replyCommitTx));
+ EXPECT_TRUE(oracleEngine3.processOracleReplyCommitTransaction(replyCommitTx));
+ }
+
+ if (unknownVotes > 451 - 400)
+ {
+ oracleEngine1.checkPendingState(queryId, allCompIdx + 1, 200, ORACLE_QUERY_STATUS_PENDING);
+ oracleEngine2.checkPendingState(queryId, allCompIdx + 1, 200, ORACLE_QUERY_STATUS_PENDING);
+ oracleEngine3.checkPendingState(queryId, allCompIdx + 1, ownCompIdx + 1, ORACLE_QUERY_STATUS_PENDING);
+ }
+ else
+ {
+ oracleEngine1.checkStatus(queryId, ORACLE_QUERY_STATUS_UNRESOLVABLE);
+ oracleEngine2.checkStatus(queryId, ORACLE_QUERY_STATUS_UNRESOLVABLE);
+ oracleEngine3.checkStatus(queryId, ORACLE_QUERY_STATUS_UNRESOLVABLE);
+ }
+ }
+
+ //-------------------------------------------------------------------------
+ // notifications
+ const OracleNotificationData* notification = oracleEngine1.getNotification();
+ EXPECT_NE(notification, nullptr);
+ EXPECT_EQ((int)notification->contractIndex, (int)contractIndex);
+ EXPECT_EQ(notification->procedureId, notificationProcId);
+ EXPECT_EQ((int)notification->inputSize, sizeof(OracleNotificationInput));
+ const auto* notificationInput = (const OracleNotificationInput*) & notification->inputBuffer;
+ EXPECT_EQ(notificationInput->queryId, queryId);
+ EXPECT_EQ(notificationInput->status, ORACLE_QUERY_STATUS_UNRESOLVABLE);
+ EXPECT_EQ(notificationInput->subscriptionId, 0);
+ EXPECT_EQ(notificationInput->reply.numerator, 0);
+ EXPECT_EQ(notificationInput->reply.denominator, 0);
+
+ // no additional notifications
+ EXPECT_EQ(oracleEngine1.getNotification(), nullptr);
+}
+
+TEST(OracleEngine, ContractQueryTimeout)
+{
+ OracleEngineTest test;
+
+ // simulate one node
+ const m256i* allCompPubKeys = broadcastedComputors.computors.publicKeys;
+ OracleEngineWithInitAndDeinit<676> oracleEngine1(allCompPubKeys);
+
+ OI::Price::OracleQuery priceQuery;
+ priceQuery.oracle = m256i(10, 20, 30, 40);
+ priceQuery.currency1 = m256i(20, 30, 40, 50);
+ priceQuery.currency2 = m256i(30, 40, 50, 60);
+ priceQuery.timestamp = QPI::DateAndTime::now();
+ QPI::uint32 interfaceIndex = 0;
+ QPI::uint16 contractIndex = 2;
+ QPI::uint32 timeout = 10000;
+ const QPI::uint32 notificationProcId = 12345;
+ EXPECT_TRUE(userProcedureRegistry->add(notificationProcId, { dummyNotificationProc, 1, 1024, 128, 1 }));
+
+ //-------------------------------------------------------------------------
+ // start contract query / check message to OM node
+ QPI::sint64 queryId = oracleEngine1.startContractQuery(contractIndex, interfaceIndex, &priceQuery, sizeof(priceQuery), timeout, notificationProcId);
+ checkNetworkMessageOracleMachineQuery(queryId, priceQuery.oracle, timeout);
+
+ //-------------------------------------------------------------------------
+ // get query contract data
+ OI::Price::OracleQuery priceQueryReturned;
+ EXPECT_TRUE(oracleEngine1.getOracleQuery(queryId, &priceQueryReturned, sizeof(priceQueryReturned)));
+ EXPECT_EQ(memcmp(&priceQueryReturned, &priceQuery, sizeof(priceQuery)), 0);
+
+ //-------------------------------------------------------------------------
+ // timeout: no response from OM node
+ ++system.tick;
+ ++etalonTick.hour;
+ oracleEngine1.processTimeouts();
+
+ //-------------------------------------------------------------------------
+ // notifications
+ const OracleNotificationData* notification = oracleEngine1.getNotification();
+ EXPECT_NE(notification, nullptr);
+ EXPECT_EQ((int)notification->contractIndex, (int)contractIndex);
+ EXPECT_EQ(notification->procedureId, notificationProcId);
+ EXPECT_EQ((int)notification->inputSize, sizeof(OracleNotificationInput));
+ const auto* notificationInput = (const OracleNotificationInput*) & notification->inputBuffer;
+ EXPECT_EQ(notificationInput->queryId, queryId);
+ EXPECT_EQ(notificationInput->status, ORACLE_QUERY_STATUS_TIMEOUT);
+ EXPECT_EQ(notificationInput->subscriptionId, 0);
+ EXPECT_EQ(notificationInput->reply.numerator, 0);
+ EXPECT_EQ(notificationInput->reply.denominator, 0);
+
+ // no additional notifications
+ EXPECT_EQ(oracleEngine1.getNotification(), nullptr);
+}
+
+/*
+Tests:
+- oracleEngine.getReplyCommitTransaction() with more than 1 commit / tx
+- processOracleReplyCommitTransaction wihtout get getReplyCommitTransaction
+- trigger failure
+*/
\ No newline at end of file
diff --git a/test/oracle_testing.h b/test/oracle_testing.h
new file mode 100644
index 000000000..c84ac3116
--- /dev/null
+++ b/test/oracle_testing.h
@@ -0,0 +1,74 @@
+#pragma once
+
+// Include this first, to ensure "logging/logging.h" isn't included before the custom LOG_BUFFER_SIZE has been defined
+#include "logging_test.h"
+
+#undef MAX_NUMBER_OF_TICKS_PER_EPOCH
+#define MAX_NUMBER_OF_TICKS_PER_EPOCH 50
+#undef TICKS_TO_KEEP_FROM_PRIOR_EPOCH
+#define TICKS_TO_KEEP_FROM_PRIOR_EPOCH 5
+#include "ticking/tick_storage.h"
+
+#include "gtest/gtest.h"
+
+#include "oracle_core/oracle_engine.h"
+#include "contract_core/qpi_ticking_impl.h"
+#include "contract_core/qpi_spectrum_impl.h"
+
+
+union EnqueuedNetworkMessage
+{
+ RequestResponseHeader header;
+
+ struct
+ {
+ RequestResponseHeader header;
+ OracleMachineQuery queryMetadata;
+ unsigned char queryData[MAX_ORACLE_QUERY_SIZE];
+ } omQuery;
+};
+
+GLOBAL_VAR_DECL EnqueuedNetworkMessage enqueuedNetworkMessage;
+
+template
+static void checkNetworkMessageOracleMachineQuery(QPI::uint64 expectedOracleQueryId, QPI::id expectedOracle, QPI::uint32 expectedTimeout)
+{
+ EXPECT_EQ(enqueuedNetworkMessage.header.type(), OracleMachineQuery::type());
+ EXPECT_GT(enqueuedNetworkMessage.header.size(), sizeof(RequestResponseHeader) + sizeof(OracleMachineQuery));
+ QPI::uint32 queryDataSize = enqueuedNetworkMessage.header.size() - sizeof(RequestResponseHeader) - sizeof(OracleMachineQuery);
+ EXPECT_LE(queryDataSize, (QPI::uint32)MAX_ORACLE_QUERY_SIZE);
+ EXPECT_EQ(queryDataSize, sizeof(typename OracleInterface::OracleQuery));
+ EXPECT_EQ(enqueuedNetworkMessage.omQuery.queryMetadata.oracleInterfaceIndex, OracleInterface::oracleInterfaceIndex);
+ EXPECT_EQ(enqueuedNetworkMessage.omQuery.queryMetadata.oracleQueryId, expectedOracleQueryId);
+ EXPECT_EQ(enqueuedNetworkMessage.omQuery.queryMetadata.timeoutInMilliseconds, expectedTimeout);
+ const auto* q = (const OracleInterface::OracleQuery*)enqueuedNetworkMessage.omQuery.queryData;
+ EXPECT_EQ(q->oracle, expectedOracle);
+}
+
+static void enqueueResponse(Peer* peer, unsigned int dataSize, unsigned char type, unsigned int dejavu, const void* data)
+{
+ EXPECT_EQ(peer, (Peer*)0x1);
+ EXPECT_LE(dataSize, sizeof(OracleMachineQuery) + MAX_ORACLE_QUERY_SIZE);
+ EXPECT_TRUE(enqueuedNetworkMessage.header.checkAndSetSize(sizeof(RequestResponseHeader) + dataSize));
+ enqueuedNetworkMessage.header.setType(type);
+ enqueuedNetworkMessage.header.setDejavu(dejavu);
+ copyMem(&enqueuedNetworkMessage.omQuery.queryMetadata, data, dataSize);
+}
+
+static inline QPI::uint64 getContractOracleQueryId(QPI::uint32 tick, QPI::uint32 indexInTick)
+{
+ return ((QPI::uint64)tick << 31) | (indexInTick + NUMBER_OF_TRANSACTIONS_PER_TICK);
+}
+
+static void addOracleTransactionToTickStorage(const Transaction* tx, unsigned int txIndex)
+{
+ const unsigned int txSize = tx->totalSize();
+ auto* offsets = ts.tickTransactionOffsets.getByTickInCurrentEpoch(tx->tick);
+ if (ts.nextTickTransactionOffset + txSize <= ts.tickTransactions.storageSpaceCurrentEpoch)
+ {
+ EXPECT_EQ(offsets[txIndex], 0);
+ offsets[txIndex] = ts.nextTickTransactionOffset;
+ copyMem(ts.tickTransactions(ts.nextTickTransactionOffset), tx, txSize);
+ ts.nextTickTransactionOffset += txSize;
+ }
+}
diff --git a/test/qpi.cpp b/test/qpi.cpp
index 9ea0b67fe..6b7fc1468 100644
--- a/test/qpi.cpp
+++ b/test/qpi.cpp
@@ -343,6 +343,32 @@ TEST(TestCoreQPI, Mod) {
EXPECT_EQ(QPI::mod(2, -1), 0);
}
+TEST(TestCoreQPI, IdFromCharacters)
+{
+ using namespace QPI::Ch;
+
+ QPI::id test('t', 'e', s, t, '!');
+ EXPECT_EQ(std::string((char*)test.m256i_i8), std::string("test!"));
+
+ test = QPI::id(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z, space, dot, comma, colon, semicolon, null);
+ EXPECT_EQ(std::string((char*)test.m256i_i8), std::string("abcdefghijklmnopqrstuvwxyz .,:;"));
+
+ test = QPI::id(A, B, C, D, E, F, G, H, I, J, K, L, M, N, O, P, Q, R, S, T, U, V, W, X, Y, Z, slash, backslash);
+ EXPECT_EQ(std::string((char*)test.m256i_i8), std::string("ABCDEFGHIJKLMNOPQRSTUVWXYZ/\\"));
+
+ test = QPI::id(_0, _1, _2, _3, _4, _5, _6, _7, _8, _9);
+ EXPECT_EQ(std::string((char*)test.m256i_i8), std::string("0123456789"));
+
+ test = QPI::id(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0);
+ EXPECT_TRUE(isZero(test));
+
+ test = QPI::id(0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, space);
+ EXPECT_EQ(test.i8._31, ' ');
+ test.m256i_i8[31] = 0;
+ EXPECT_TRUE(isZero(test));
+}
+
+
struct ContractExecInitDeinitGuard
{
ContractExecInitDeinitGuard()
diff --git a/test/test.vcxproj b/test/test.vcxproj
index 7eda0af5b..c070ea2b4 100644
--- a/test/test.vcxproj
+++ b/test/test.vcxproj
@@ -114,6 +114,7 @@
+
@@ -126,6 +127,7 @@
+
@@ -191,4 +193,4 @@
-
\ No newline at end of file
+
diff --git a/test/test.vcxproj.filters b/test/test.vcxproj.filters
index 391098eb9..e1c0ac90c 100644
--- a/test/test.vcxproj.filters
+++ b/test/test.vcxproj.filters
@@ -47,6 +47,7 @@
+
@@ -54,6 +55,7 @@
+
@@ -68,4 +70,4 @@
core
-
\ No newline at end of file
+
diff --git a/test/tick_storage.cpp b/test/tick_storage.cpp
index 5876ace7f..103390c99 100644
--- a/test/tick_storage.cpp
+++ b/test/tick_storage.cpp
@@ -39,101 +39,100 @@ class TestTickStorage : public TickStorage
nextTickTransactionOffset += transactionSize;
}
}
-};
-
-TestTickStorage ts;
-
-void addTick(unsigned int tick, unsigned int seed, unsigned short maxTransactions)
-{
- // use pseudo-random sequence
- std::mt19937 gen32(seed);
-
- // add tick data
- TickData& td = ts.tickData.getByTickInCurrentEpoch(tick);
- td.epoch = 1234;
- td.tick = tick;
- // add computor ticks
- Tick* computorTicks = ts.ticks.getByTickInCurrentEpoch(tick);
- for (int i = 0; i < NUMBER_OF_COMPUTORS; ++i)
+ void addTick(unsigned int tick, unsigned int seed, unsigned short maxTransactions)
{
- computorTicks[i].epoch = 1234;
- computorTicks[i].computorIndex = i;
- computorTicks[i].tick = tick;
- computorTicks[i].prevResourceTestingDigest = gen32();
- }
+ // use pseudo-random sequence
+ std::mt19937 gen32(seed);
- // add transactions of tick
- unsigned int transactionNum = gen32() % (maxTransactions + 1);
- unsigned int orderMode = gen32() % 2;
- unsigned int transactionSlot;
- for (unsigned int transaction = 0; transaction < transactionNum; ++transaction)
- {
- if (orderMode == 0)
- transactionSlot = transaction; // standard order
- else if (orderMode == 1)
- transactionSlot = transactionNum - 1 - transaction; // backward order
- ts.addTransaction(tick, transactionSlot, gen32() % MAX_INPUT_SIZE);
- }
- ts.checkStateConsistencyWithAssert();
-}
-
-void checkTick(unsigned int tick, unsigned int seed, unsigned short maxTransactions, bool previousEpoch = false)
-{
- // only last ticks of previous epoch are kept in storage -> check okay
- if (previousEpoch && !ts.tickInPreviousEpochStorage(tick))
- return;
+ // add tick data
+ TickData& td = tickData.getByTickInCurrentEpoch(tick);
+ td.epoch = 1234;
+ td.tick = tick;
- // use pseudo-random sequence
- std::mt19937 gen32(seed);
-
- // check tick data
- TickData& td = previousEpoch ? ts.tickData.getByTickInPreviousEpoch(tick) : ts.tickData.getByTickInCurrentEpoch(tick);
- EXPECT_EQ((int)td.epoch, (int)1234);
- EXPECT_EQ(td.tick, tick);
-
- // check computor ticks
- Tick* computorTicks = previousEpoch ? ts.ticks.getByTickInPreviousEpoch(tick) : ts.ticks.getByTickInCurrentEpoch(tick);
- for (int i = 0; i < NUMBER_OF_COMPUTORS; ++i)
- {
- EXPECT_EQ((int)computorTicks[i].epoch, (int)1234);
- EXPECT_EQ((int)computorTicks[i].computorIndex, (int)i);
- EXPECT_EQ(computorTicks[i].tick, tick);
- EXPECT_EQ(computorTicks[i].prevResourceTestingDigest, gen32());
- }
+ // add computor ticks
+ Tick* computorTicks = ticks.getByTickInCurrentEpoch(tick);
+ for (int i = 0; i < NUMBER_OF_COMPUTORS; ++i)
+ {
+ computorTicks[i].epoch = 1234;
+ computorTicks[i].computorIndex = i;
+ computorTicks[i].tick = tick;
+ computorTicks[i].prevResourceTestingDigest = gen32();
+ }
- // check transactions of tick
- {
- const auto* offsets = previousEpoch ? ts.tickTransactionOffsets.getByTickInPreviousEpoch(tick) : ts.tickTransactionOffsets.getByTickInCurrentEpoch(tick);
+ // add transactions of tick
unsigned int transactionNum = gen32() % (maxTransactions + 1);
unsigned int orderMode = gen32() % 2;
unsigned int transactionSlot;
-
for (unsigned int transaction = 0; transaction < transactionNum; ++transaction)
{
- int expectedInputSize = (int)(gen32() % MAX_INPUT_SIZE);
-
if (orderMode == 0)
transactionSlot = transaction; // standard order
else if (orderMode == 1)
transactionSlot = transactionNum - 1 - transaction; // backward order
+ addTransaction(tick, transactionSlot, gen32() % MAX_INPUT_SIZE);
+ }
+ checkStateConsistencyWithAssert();
+ }
+
+ void checkTick(unsigned int tick, unsigned int seed, unsigned short maxTransactions, bool previousEpoch = false)
+ {
+ // only last ticks of previous epoch are kept in storage -> check okay
+ if (previousEpoch && !tickInPreviousEpochStorage(tick))
+ return;
- // If previousEpoch, some transactions at the beginning may not have fit into the storage and are missing -> check okay
- // If current epoch, some may be missing at he end due to limited storage -> check okay
- if (!offsets[transactionSlot])
- continue;
+ // use pseudo-random sequence
+ std::mt19937 gen32(seed);
- Transaction* tp = ts.tickTransactions(offsets[transactionSlot]);
- EXPECT_TRUE(tp->checkValidity());
- EXPECT_EQ(tp->tick, tick);
- EXPECT_EQ((int)tp->inputSize, expectedInputSize);
+ // check tick data
+ TickData& td = previousEpoch ? tickData.getByTickInPreviousEpoch(tick) : tickData.getByTickInCurrentEpoch(tick);
+ EXPECT_EQ((int)td.epoch, (int)1234);
+ EXPECT_EQ(td.tick, tick);
+
+ // check computor ticks
+ Tick* computorTicks = previousEpoch ? ticks.getByTickInPreviousEpoch(tick) : ticks.getByTickInCurrentEpoch(tick);
+ for (int i = 0; i < NUMBER_OF_COMPUTORS; ++i)
+ {
+ EXPECT_EQ((int)computorTicks[i].epoch, (int)1234);
+ EXPECT_EQ((int)computorTicks[i].computorIndex, (int)i);
+ EXPECT_EQ(computorTicks[i].tick, tick);
+ EXPECT_EQ(computorTicks[i].prevResourceTestingDigest, gen32());
}
- }
-}
+ // check transactions of tick
+ {
+ const auto* offsets = previousEpoch ? tickTransactionOffsets.getByTickInPreviousEpoch(tick) : tickTransactionOffsets.getByTickInCurrentEpoch(tick);
+ unsigned int transactionNum = gen32() % (maxTransactions + 1);
+ unsigned int orderMode = gen32() % 2;
+ unsigned int transactionSlot;
+
+ for (unsigned int transaction = 0; transaction < transactionNum; ++transaction)
+ {
+ int expectedInputSize = (int)(gen32() % MAX_INPUT_SIZE);
+
+ if (orderMode == 0)
+ transactionSlot = transaction; // standard order
+ else if (orderMode == 1)
+ transactionSlot = transactionNum - 1 - transaction; // backward order
+
+ // If previousEpoch, some transactions at the beginning may not have fit into the storage and are missing -> check okay
+ // If current epoch, some may be missing at he end due to limited storage -> check okay
+ if (!offsets[transactionSlot])
+ continue;
+
+ Transaction* tp = tickTransactions(offsets[transactionSlot]);
+ EXPECT_TRUE(tp->checkValidity());
+ EXPECT_EQ(tp->tick, tick);
+ EXPECT_EQ((int)tp->inputSize, expectedInputSize);
+ }
+ }
+ }
+};
-TEST(TestCoreTickStorage, EpochTransition) {
+TEST(TestCoreTickStorage, EpochTransition)
+{
+ TestTickStorage ts;
unsigned int seed = 42;
// use pseudo-random sequence
@@ -170,11 +169,11 @@ TEST(TestCoreTickStorage, EpochTransition) {
// add ticks
for (int i = 0; i < firstEpochTicks; ++i)
- addTick(firstEpochTick0 + i, firstEpochSeeds[i], maxTransactions);
+ ts.addTick(firstEpochTick0 + i, firstEpochSeeds[i], maxTransactions);
// check ticks
for (int i = 0; i < firstEpochTicks; ++i)
- checkTick(firstEpochTick0 + i, firstEpochSeeds[i], maxTransactions);
+ ts.checkTick(firstEpochTick0 + i, firstEpochSeeds[i], maxTransactions);
// Epoch transistion
ts.beginEpoch(secondEpochTick0);
@@ -182,14 +181,14 @@ TEST(TestCoreTickStorage, EpochTransition) {
// add ticks
for (int i = 0; i < secondEpochTicks; ++i)
- addTick(secondEpochTick0 + i, secondEpochSeeds[i], maxTransactions);
+ ts.addTick(secondEpochTick0 + i, secondEpochSeeds[i], maxTransactions);
// check ticks
for (int i = 0; i < secondEpochTicks; ++i)
- checkTick(secondEpochTick0 + i, secondEpochSeeds[i], maxTransactions);
+ ts.checkTick(secondEpochTick0 + i, secondEpochSeeds[i], maxTransactions);
bool previousEpoch = true;
for (int i = 0; i < firstEpochTicks; ++i)
- checkTick(firstEpochTick0 + i, firstEpochSeeds[i], maxTransactions, previousEpoch);
+ ts.checkTick(firstEpochTick0 + i, firstEpochSeeds[i], maxTransactions, previousEpoch);
// Epoch transistion
ts.beginEpoch(thirdEpochTick0);
@@ -197,13 +196,13 @@ TEST(TestCoreTickStorage, EpochTransition) {
// add ticks
for (int i = 0; i < thirdEpochTicks; ++i)
- addTick(thirdEpochTick0 + i, thirdEpochSeeds[i], maxTransactions);
+ ts.addTick(thirdEpochTick0 + i, thirdEpochSeeds[i], maxTransactions);
// check ticks
for (int i = 0; i < thirdEpochTicks; ++i)
- checkTick(thirdEpochTick0 + i, thirdEpochSeeds[i], maxTransactions);
+ ts.checkTick(thirdEpochTick0 + i, thirdEpochSeeds[i], maxTransactions);
for (int i = 0; i < secondEpochTicks; ++i)
- checkTick(secondEpochTick0 + i, secondEpochSeeds[i], maxTransactions, previousEpoch);
+ ts.checkTick(secondEpochTick0 + i, secondEpochSeeds[i], maxTransactions, previousEpoch);
ts.deinit();
}