@@ -374,6 +374,134 @@ TEST(ConsumerStateTable, set_del_set)
374
374
EXPECT_EQ (qlen, 0U );
375
375
}
376
376
377
+ TEST (ConsumerStateTable, set_pop_del_set_pop_get)
378
+ {
379
+ clearDB ();
380
+
381
+ /* Prepare producer */
382
+ int index = 0 ;
383
+ string tableName = " UT_REDIS_THREAD_" + to_string (index );
384
+ DBConnector db (TEST_DB, " localhost" , 6379 , 0 );
385
+ ProducerStateTable p (&db, tableName);
386
+ string key = " TheKey" ;
387
+ int maxNumOfFields = 2 ;
388
+
389
+ /* First set operation */
390
+ {
391
+ vector<FieldValueTuple> fields;
392
+ for (int j = 0 ; j < maxNumOfFields; j++)
393
+ {
394
+ FieldValueTuple t (field (j), value (j));
395
+ fields.push_back (t);
396
+ }
397
+ p.set (key, fields);
398
+ }
399
+
400
+ /* Prepare consumer */
401
+ ConsumerStateTable c (&db, tableName);
402
+ Select cs;
403
+ Selectable *selectcs;
404
+ cs.addSelectable (&c);
405
+
406
+ /* First pop operation */
407
+ {
408
+ int ret = cs.select (&selectcs);
409
+ EXPECT_EQ (ret, Select::OBJECT);
410
+ KeyOpFieldsValuesTuple kco;
411
+ c.pop (kco);
412
+ EXPECT_EQ (kfvKey (kco), key);
413
+ EXPECT_EQ (kfvOp (kco), " SET" );
414
+
415
+ auto fvs = kfvFieldsValues (kco);
416
+ EXPECT_EQ (fvs.size (), (unsigned int )maxNumOfFields);
417
+
418
+ map<string, string> mm;
419
+ for (auto fv: fvs)
420
+ {
421
+ mm[fvField (fv)] = fvValue (fv);
422
+ }
423
+
424
+ for (int j = 0 ; j < maxNumOfFields; j ++)
425
+ {
426
+ EXPECT_EQ (mm[field (j)], value (j));
427
+ }
428
+ }
429
+
430
+ /* Del operation and second set operation will be merged */
431
+
432
+ /* Del operation */
433
+ p.del (key);
434
+
435
+ /* Second set operation */
436
+ {
437
+ vector<FieldValueTuple> fields;
438
+ for (int j = 0 ; j < maxNumOfFields * 2 ; j += 2 )
439
+ {
440
+ FieldValueTuple t (field (j), value (j));
441
+ fields.push_back (t);
442
+ }
443
+ p.set (key, fields);
444
+ }
445
+
446
+ /* Second pop operation */
447
+ {
448
+ int ret = cs.select (&selectcs);
449
+ EXPECT_EQ (ret, Select::OBJECT);
450
+ KeyOpFieldsValuesTuple kco;
451
+ c.pop (kco);
452
+ EXPECT_EQ (kfvKey (kco), key);
453
+ EXPECT_EQ (kfvOp (kco), " SET" );
454
+
455
+ auto fvs = kfvFieldsValues (kco);
456
+
457
+ /* size of fvs should be maxNumOfFields, no "field 1" left from first set*/
458
+ EXPECT_EQ (fvs.size (), (unsigned int )maxNumOfFields);
459
+
460
+ map<string, string> mm;
461
+ for (auto fv: fvs)
462
+ {
463
+ mm[fvField (fv)] = fvValue (fv);
464
+ }
465
+
466
+ for (int j = 0 ; j < maxNumOfFields * 2 ; j += 2 )
467
+ {
468
+ EXPECT_EQ (mm[field (j)], value (j));
469
+ }
470
+ }
471
+
472
+ /* Get data directly from table in redis DB*/
473
+ Table t (&db, tableName);
474
+ vector<FieldValueTuple> values;
475
+ t.get (key, values);
476
+ /* size of values should be maxNumOfFields, no "field 1" left from first set */
477
+ EXPECT_EQ (values.size (), (unsigned int )maxNumOfFields);
478
+
479
+ /*
480
+ * Third pop operation, consumer received two consectuive signals.
481
+ * data depleted upon first one
482
+ */
483
+ {
484
+ int ret = cs.select (&selectcs, 1000 );
485
+ EXPECT_EQ (ret, Select::OBJECT);
486
+ KeyOpFieldsValuesTuple kco;
487
+ c.pop (kco);
488
+ EXPECT_EQ (kfvKey (kco), " " );
489
+ }
490
+
491
+ /* Third select operation */
492
+ {
493
+ int ret = cs.select (&selectcs, 1000 );
494
+ EXPECT_EQ (ret, Select::TIMEOUT);
495
+ }
496
+
497
+ /* State Queue should be empty */
498
+ RedisCommand keys;
499
+ keys.format (" KEYS %s*" , (c.getStateHashPrefix () + tableName).c_str ());
500
+ RedisReply r (&db, keys, REDIS_REPLY_ARRAY);
501
+ auto qlen = r.getContext ()->elements ;
502
+ EXPECT_EQ (qlen, 0U );
503
+ }
504
+
377
505
TEST (ConsumerStateTable, singlethread)
378
506
{
379
507
clearDB ();
0 commit comments